将一行拆分为多行pyspark
Posted
技术标签:
【中文标题】将一行拆分为多行pyspark【英文标题】:spliting a row to multiple row pyspark 【发布时间】:2018-06-06 07:15:34 【问题描述】:我有一个类似的数据框:
df = spark.createDataFrame([(0, "departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Pink","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue"), (1, "departmentcode__10~#~p99189h8pk0__10484~#~prod_productcolor__Dustysalmon Black","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue"), (2, "departmentcode__60~#~p99189h8pk0__10485~#~prod_productcolor__Dustysalmon White","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue")], ["id", "items_base", "item_target"])
我需要一个类似于以下的新数据框:
+---+-----------------+----------------+--------+------+
|id |dept0 |att0 |position|flag |
+---+-----------------+----------------+--------+------+
|0 |departmentcode |50 |1 |Base |
|0 |p99189h8pk0 |10483 |2 |Base |
|0 |prod_productcolor|Dustysalmon Pink|3 |Base |
|0 |departmentcode |50 |1 |Target|
|0 |p99189h8pk0 |10483 |2 |Target|
|0 |prod_productcolor|Dustysalmon Blue|3 |Target|
|1 |departmentcode |10 |1 |Base |
...
...
+---+-----------------+----------------+--------+------+
我用 ~#~ 和 __ 拆分 items_base 和 item_target 并创建新的 6 行。 items_base 3行,item_target 3行(其中position为dept0拆分后的位置,flag表示是items_base还是items_target)
【问题讨论】:
【参考方案1】:您可以使用udf
函数进行拆分字符串的拆分和合并,最后使用explode
和select
函数将您的最终数据帧作为
from pyspark.sql import functions as f
from pyspark.sql import types as t
@f.udf(t.ArrayType(t.ArrayType(t.StringType())))
def splitUdf(base, target):
return [s.split("__") + [str(index+1), 'base'] for index, s in enumerate(base.split("~#~"))] + [s.split("__") + [str(index+1), 'target'] for index, s in enumerate(target.split("~#~"))]
df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
.select(f.col('id'), f.col('exploded')[0].alias('dept0'), f.col('exploded')[1].alias('att0'), f.col('exploded')[2].alias('position'), f.col('exploded')[3].alias('flag'))\
.show(truncate=False)
这应该给你
+---+-----------------+-----------------+--------+------+
|id |dept0 |att0 |position|flag |
+---+-----------------+-----------------+--------+------+
|0 |departmentcode |50 |1 |base |
|0 |p99189h8pk0 |10483 |2 |base |
|0 |prod_productcolor|Dustysalmon Pink |3 |base |
|0 |departmentcode |50 |1 |target|
|0 |p99189h8pk0 |10483 |2 |target|
|0 |prod_productcolor|Dustysalmon Blue |3 |target|
|1 |departmentcode |10 |1 |base |
|1 |p99189h8pk0 |10484 |2 |base |
|1 |prod_productcolor|Dustysalmon Black|3 |base |
|1 |departmentcode |50 |1 |target|
|1 |p99189h8pk0 |10483 |2 |target|
|1 |prod_productcolor|Dustysalmon Blue |3 |target|
|2 |departmentcode |60 |1 |base |
|2 |p99189h8pk0 |10485 |2 |base |
|2 |prod_productcolor|Dustysalmon White|3 |base |
|2 |departmentcode |50 |1 |target|
|2 |p99189h8pk0 |10483 |2 |target|
|2 |prod_productcolor|Dustysalmon Blue |3 |target|
+---+-----------------+-----------------+--------+------+
希望回答对你有帮助
更新
如果你从 udf 函数返回结构类型,则更好
@f.udf(t.ArrayType(t.StructType([t.StructField('dept0', t.StringType(), True), t.StructField('att0', t.StringType(), True), t.StructField('position', t.IntegerType(), True), t.StructField('flag', t.StringType(), True)])))
def splitUdf(base, target):
return [(s.split("__")[0], s.split("__")[1], index+1, 'base') for index, s in enumerate(base.split("~#~"))] + [(s.split("__")[0], s.split("__")[1], index+1, 'target') for index, s in enumerate(target.split("~#~"))]
df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
.select(f.col('id'), f.col('exploded.*'))\
.show(truncate=False)
这应该会给你同样的结果
【讨论】:
【参考方案2】:您必须执行很多步骤才能获得结果,但它们并不复杂。
base_df = df.select(
'id',
F.split('items_base', '~#~').alias('items_base')
).select(
'id',
F.posexplode('items_base')
).select(
'id',
F.split('col', '__').alias('items_base'),
(F.col('pos')+1).alias('position'),
F.lit('Base').alias('flag')
).select(
'id',
F.col('items_base').getItem(0).alias('dept0'),
F.col('items_base').getItem(1).alias('att0'),
'position',
'flag',
)
target_df = df.select(
'id',
F.split('item_target', '~#~').alias('item_target')
).select(
'id',
F.posexplode('item_target')
).select(
'id',
F.split('col', '__').alias('item_target'),
(F.col('pos')+1).alias('position'),
F.lit('Target').alias('flag')
).select(
'id',
F.col('item_target').getItem(0).alias('dept0'),
F.col('item_target').getItem(1).alias('att0'),
'position',
'flag',
)
base_df.union(target_df).show()
+---+-----------------+-----------------+--------+------+
| id| dept0| att0|position| flag|
+---+-----------------+-----------------+--------+------+
| 0| departmentcode| 50| 1| Base|
| 0| p99189h8pk0| 10483| 2| Base|
| 0|prod_productcolor| Dustysalmon Pink| 3| Base|
| 1| departmentcode| 10| 1| Base|
| 1| p99189h8pk0| 10484| 2| Base|
| 1|prod_productcolor|Dustysalmon Black| 3| Base|
| 2| departmentcode| 60| 1| Base|
| 2| p99189h8pk0| 10485| 2| Base|
| 2|prod_productcolor|Dustysalmon White| 3| Base|
| 0| departmentcode| 50| 1|Target|
| 0| p99189h8pk0| 10483| 2|Target|
| 0|prod_productcolor| Dustysalmon Blue| 3|Target|
| 1| departmentcode| 50| 1|Target|
| 1| p99189h8pk0| 10483| 2|Target|
| 1|prod_productcolor| Dustysalmon Blue| 3|Target|
| 2| departmentcode| 50| 1|Target|
| 2| p99189h8pk0| 10483| 2|Target|
| 2|prod_productcolor| Dustysalmon Blue| 3|Target|
+---+-----------------+-----------------+--------+------+
【讨论】:
【参考方案3】:您可以使用flatMap
将长度为 N 的 RDD 转换为 N 个集合的集合:
from pyspark.sql import Row
def etl(row) :
list_row = []
items_base = row.items_base.split('~#~')
for item in items_base:
row_items_base = Row(id = row.id, dept0 = item.split('__')[0], att0 = item.split('__')[1], position = items_base.index(item) + 1, flag = 'Base')
list_row.append(row_items_base)
item_target = row.item_target.split('~#~')
for item in item_target:
row_items_base = Row(id = row.id, dept0 = item.split('__')[0], att0 = item.split('__')[1], position = item_target.index(item) + 1, flag = 'Target')
list_row.append(row_items_base)
return list_row
df.rdd.flatMap(etl).toDF().show()
输出:
【讨论】:
以上是关于将一行拆分为多行pyspark的主要内容,如果未能解决你的问题,请参考以下文章