PySpark 数据框如何使用平面图
Posted
技术标签:
【中文标题】PySpark 数据框如何使用平面图【英文标题】:PySpark dataframe how to use flatmap 【发布时间】:2021-07-18 23:35:36 【问题描述】:我正在编写一个比较两个表的 PySpark 程序,比如说 Table1 和 Table2 两个表结构相同,但可能包含不同的数据
比方说,表 1 有以下列
key1, key2, col1, col2, col3
表1中的样本数据如下
"a", 1, "x1", "y1", "z1"
"a", 2, "x2", "y2", "z2"
"a", 3, "x3", "y3", "z3"
类似的表 2 有以下列
key1, key2, col1, col2, col3
表1中的样本数据如下
"a", 1, "x1", "y1", "z1"
"a", 2, "x21", "y21", "z2"
"a", 3, "x3", "y3", "z31"
程序创建一个包含以下列的数据框(比如说 df1)
Key1、Key2、a.Col1、a.Col2、a.Col3、b.Col1、b.Col2、b.Col3、column_names
示例数据:
"a", 2, "x2", "y2", "z2", "x21", "y21", "z2", "col1,col2"
"a", 3, "x3", "y3", "z3", "x3", "y3", "z31", "col3"
“column_names”列包含在 table1 和 table2 之间具有不同值的列
使用这个数据框,我需要创建另一个包含以下结构的数据框
key1、key2、field_in_difference、src_value、tgt_value
"a", 2, "col1", "x2", "x21"
"a", 2, "col2", "y2", "y21"
"a", 3, "col3", "z3", "z31"
我认为我需要在 PySpark 中使用 flatMap 我可以对数据框中的一列使用平面图,以便在结果数据框中创建多行吗?但剩余的列被复制到新行中?
我尝试使用以下语法,但语法似乎不正确
df2 = df1.withColumn("newcolumn", func.concat_ws(",", flatMap(lambda x: x.split(','))))
但我得到一个错误 NameErrorL name flatMap is not defined 不知道如何指定平面图需要在“column_names”列上完成,同时保持其余列不变..
我认为该方法是第一步在不同的列中创建一行 然后在第二步中,创建另一个将转换为预期输出的 df
非常感谢您的帮助
【问题讨论】:
【参考方案1】:flatMap
适用于 RDD,而不是 DataFrame。
我不太明白你想如何在 df1 上使用 flatMap
,但我认为直接从 Table 1 和 Table 2 em> 可能更容易。假设 Table 1 是 df_src
,Table 2 是 df_tgt
。
df_src.show()
+----+----+----+----+----+
|key1|key2|col1|col2|col3|
+----+----+----+----+----+
| a| 1| x1| y1| z1|
| a| 2| x2| y2| z2|
| a| 3| x3| y3| z3|
+----+----+----+----+----+
df_tgt.show()
+----+----+----+----+----+
|key1|key2|col1|col2|col3|
+----+----+----+----+----+
| a| 1| x1| y1| z1|
| a| 2| x21| y21| z2|
| a| 3| x3| y3| z31|
+----+----+----+----+----+
您可以使用 stack
函数、join
它们和 filter
它来取消透视这两个数据帧。
from pyspark.sql.functions import col
# unpivot col1, col2 and col3 of both dataframes. rename key columns as well
df_src = df_src.selectExpr("key1 key1_s", "key2 key2_s", "stack(3, 'col1', col1, 'col2', col2, 'col3', col3) (field_s, src_value)")
df_tgt = df_tgt.selectExpr("key1 key1_t", "key2 key2_t", "stack(3, 'col1', col1, 'col2', col2, 'col3', col3) (field_t, tgt_value)")
# join the dataframes on keys and field, then filter where field values are different
df_res = (df_src
.join(df_tgt,
[col('key1_s') == col('key1_t'), col('key2_s') == col('key2_t'), col('field_s') == col('field_t')],
'inner')
.filter(col('src_value') != col('tgt_value'))
.selectExpr('key1_s key1', 'key2_s key2', 'field_s field_in_difference', 'src_value', 'tgt_value')
)
df_res.show()
+----+----+-------------------+---------+---------+
|key1|key2|field_in_difference|src_value|tgt_value|
+----+----+-------------------+---------+---------+
| a| 2| col1| x2| x21|
| a| 2| col2| y2| y21|
| a| 3| col3| z3| z31|
+----+----+-------------------+---------+---------+
【讨论】:
以上是关于PySpark 数据框如何使用平面图的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pyspark 2.1.0 选择另一个数据框中不存在的行?