PySpark 数据框如何使用平面图

Posted

技术标签:

【中文标题】PySpark 数据框如何使用平面图【英文标题】:PySpark dataframe how to use flatmap 【发布时间】:2021-07-18 23:35:36 【问题描述】:

我正在编写一个比较两个表的 PySpark 程序,比如说 Table1 和 Table2 两个表结构相同,但可能包含不同的数据

比方说,表 1 有以下列

key1, key2, col1, col2, col3

表1中的样本数据如下

"a", 1, "x1", "y1", "z1"
"a", 2, "x2", "y2", "z2"
"a", 3, "x3", "y3", "z3"

类似的表 2 有以下列

key1, key2, col1, col2, col3

表1中的样本数据如下

"a", 1, "x1", "y1", "z1"
"a", 2, "x21", "y21", "z2"
"a", 3, "x3", "y3", "z31"

程序创建一个包含以下列的数据框(比如说 df1)

Key1、Key2、a.Col1、a.Col2、a.Col3、b.Col1、b.Col2、b.Col3、column_names

示例数据:

"a", 2, "x2", "y2", "z2", "x21", "y21", "z2", "col1,col2"
"a", 3, "x3", "y3", "z3", "x3", "y3", "z31", "col3"

“column_names”列包含在 table1 和 table2 之间具有不同值的列

使用这个数据框,我需要创建另一个包含以下结构的数据框

key1、key2、field_in_difference、src_value、tgt_value

"a", 2, "col1", "x2", "x21"
"a", 2, "col2", "y2", "y21"
"a", 3, "col3", "z3", "z31"

我认为我需要在 PySpark 中使用 flatMap 我可以对数据框中的一列使用平面图,以便在结果数据框中创建多行吗?但剩余的列被复制到新行中?

我尝试使用以下语法,但语法似乎不正确

df2 = df1.withColumn("newcolumn", func.concat_ws(",", flatMap(lambda x: x.split(',')))) 

但我得到一个错误 NameErrorL name flatMap is not defined 不知道如何指定平面图需要在“column_names”列上完成,同时保持其余列不变..

我认为该方法是第一步在不同的列中创建一行 然后在第二步中,创建另一个将转换为预期输出的 df

非常感谢您的帮助

【问题讨论】:

【参考方案1】:

flatMap 适用于 RDD,而不是 DataFrame。

我不太明白你想如何在 df1 上使用 flatMap,但我认为直接从 Table 1Table 2 em> 可能更容易。假设 Table 1df_srcTable 2df_tgt

df_src.show()

+----+----+----+----+----+
|key1|key2|col1|col2|col3|
+----+----+----+----+----+
|   a|   1|  x1|  y1|  z1|
|   a|   2|  x2|  y2|  z2|
|   a|   3|  x3|  y3|  z3|
+----+----+----+----+----+

df_tgt.show()

+----+----+----+----+----+
|key1|key2|col1|col2|col3|
+----+----+----+----+----+
|   a|   1|  x1|  y1|  z1|
|   a|   2| x21| y21|  z2|
|   a|   3|  x3|  y3| z31|
+----+----+----+----+----+

您可以使用 stack 函数、join 它们和 filter 它来取消透视这两个数据帧。

from pyspark.sql.functions import col

# unpivot col1, col2 and col3 of both dataframes. rename key columns as well
df_src = df_src.selectExpr("key1 key1_s", "key2 key2_s", "stack(3, 'col1', col1, 'col2', col2, 'col3', col3) (field_s, src_value)")
df_tgt = df_tgt.selectExpr("key1 key1_t", "key2 key2_t", "stack(3, 'col1', col1, 'col2', col2, 'col3', col3) (field_t, tgt_value)")

# join the dataframes on keys and field, then filter where field values are different
df_res = (df_src
          .join(df_tgt, 
                [col('key1_s') == col('key1_t'), col('key2_s') == col('key2_t'), col('field_s') == col('field_t')], 
                'inner')
          .filter(col('src_value') != col('tgt_value'))
          .selectExpr('key1_s key1', 'key2_s key2', 'field_s field_in_difference', 'src_value', 'tgt_value')
          )
df_res.show()

+----+----+-------------------+---------+---------+
|key1|key2|field_in_difference|src_value|tgt_value|
+----+----+-------------------+---------+---------+
|   a|   2|               col1|       x2|      x21|
|   a|   2|               col2|       y2|      y21|
|   a|   3|               col3|       z3|      z31|
+----+----+-------------------+---------+---------+

【讨论】:

以上是关于PySpark 数据框如何使用平面图的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 pyspark 在数据块中循环数据框列

如何使用 pyspark 2.1.0 选择另一个数据框中不存在的行?

如何使用 pyspark 从文本日志文件的特定部分创建数据框

如何使用模式匹配从 pyspark 数据框中删除行?

如何使用pyspark将数据框保存在“.txt”文件中

Pyspark:如何遍历数据框列?