在 Pyspark 中将复杂的数据帧行划分为简单的行
Posted
技术标签:
【中文标题】在 Pyspark 中将复杂的数据帧行划分为简单的行【英文标题】:Dividing complex rows of dataframe to simple rows in Pyspark 【发布时间】:2016-07-11 05:50:06 【问题描述】:我有这个代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
sc = SparkContext()
sqlContext = SQLContext(sc)
documents = sqlContext.createDataFrame([
Row(id=1, title=[Row(value=u'cars', max_dist=1000)]),
Row(id=2, title=[Row(value=u'horse bus',max_dist=50), Row(value=u'normal bus',max_dist=100)]),
Row(id=3, title=[Row(value=u'Airplane', max_dist=5000)]),
Row(id=4, title=[Row(value=u'Bicycles', max_dist=20),Row(value=u'Motorbikes', max_dist=80)]),
Row(id=5, title=[Row(value=u'Trams', max_dist=15)])])
documents.show(truncate=False)
#+---+----------------------------------+
#|id |title |
#+---+----------------------------------+
#|1 |[[1000,cars]] |
#|2 |[[50,horse bus], [100,normal bus]]|
#|3 |[[5000,Airplane]] |
#|4 |[[20,Bicycles], [80,Motorbikes]] |
#|5 |[[15,Trams]] |
#+---+----------------------------------+
我需要将所有复合行(例如 2 和 4)拆分为多行,同时保留“id”,以获得如下结果:
#+---+----------------------------------+
#|id |title |
#+---+----------------------------------+
#|1 |[1000,cars] |
#|2 |[50,horse bus] |
#|2 |[100,normal bus] |
#|3 |[5000,Airplane] |
#|4 |[20,Bicycles] |
#|4 |[80,Motorbikes] |
#|5 |[15,Trams] |
#+---+----------------------------------+
【问题讨论】:
我没有时间写出答案(对不起!),但这是我正在构建的想法:将 ID 放入“标题”类别中的每个元素中(比如让每个复合行包含value
、max_dist
和id
),然后执行flatMap
。您将无法使用map
,因为map
期望输入和输出之间存在一对一的关系。如果以后有时间会充实一个答案!
谢谢@Katya Handler,我会试试你的想法。如果我不能,我会通过评论请求你的帮助:-)
【参考方案1】:
好的,这就是我想出的。不幸的是,我不得不离开Row
对象的世界并进入list
对象的世界,因为我找不到将append
到Row
对象的方法。
这意味着这个方法有点乱。如果您能找到将新列添加到 Row
对象的方法,那么这不是可行的方法。
def add_id(row):
it_list = []
for i in range(0, len(row[1])):
sm_list = []
for j in row[1][i]:
sm_list.append(j)
sm_list.append(row[0])
it_list.append(sm_list)
return it_list
with_id = documents.flatMap(lambda x: add_id(x))
df = with_id.map(lambda x: Row(id=x[2], title=Row(value=x[0], max_dist=x[1]))).toDF()
当我运行df.show()
时,我得到:
+---+----------------+
| id| title|
+---+----------------+
| 1| [cars,1000]|
| 2| [horse bus,50]|
| 2|[normal bus,100]|
| 3| [Airplane,5000]|
| 4| [Bicycles,20]|
| 4| [Motorbikes,80]|
| 5| [Trams,15]|
+---+----------------+
【讨论】:
如果我可以建议:def flatten(row): id_, items = row; return ((id_, item) for item in items)
后跟 documents.rdd.flatMap(flatten).toDF(["id", "title"])
。
再次感谢@Katya Handler,我喜欢你的原始想法 :-)【参考方案2】:
就explode
吧:
from pyspark.sql.functions import explode
documents.withColumn("title", explode("title"))
## +---+----------------+
## | id| title|
## +---+----------------+
## | 1| [1000,cars]|
## | 2| [50,horse bus]|
## | 2|[100,normal bus]|
## | 3| [5000,Airplane]|
## | 4| [20,Bicycles]|
## | 4| [80,Motorbikes]|
## | 5| [15,Trams]|
## +---+----------------+
【讨论】:
这是一个更好的答案@K.Ali 请再问一个问题@zero323,是否也可以通过结构的字段(col_1=1000,col_2='cars' 等)垂直分解结果列'title'? @K.Ali 这只是一个简单的选择(select("title.value", "title.max_dist")
)。
@zero323 想象一下,您想对结果做更多explode
。目标是两个有两列数字title_num
和另一个title_type
。 documents.withColumn("title", explode("title"))
还应该添加什么?这样的任务需要两次爆炸吗?【参考方案3】:
我正在使用 Spark Dataset API,以下解决了我的“爆炸”要求:
Dataset<Row> explodedDataset = initialDataset.selectExpr("ID","explode(finished_chunk) as chunks");
注意:Dataset API 的 explode 方法在 Spark 2.4.5 中已弃用,文档建议使用 Select(如上所示)或 FlatMap。
【讨论】:
以上是关于在 Pyspark 中将复杂的数据帧行划分为简单的行的主要内容,如果未能解决你的问题,请参考以下文章
Spark:如何从 Spark 数据帧行解析和转换 json 字符串