跨 PySpark DataFrame 列的字符串匹配

Posted

技术标签:

【中文标题】跨 PySpark DataFrame 列的字符串匹配【英文标题】:String matching across PySpark DataFrame columns 【发布时间】:2018-09-29 17:42:32 【问题描述】:

在给定参考表的情况下,我寻求标准化我的 DataFrame 的标题名称。

我的参考表是一个 DataFrame,其中包含行中的变量,标准名称和所有可能的变体名称为列:

+-------------+---------+---------+
|Standard_name|Variant_1|Variant_2|
+-------------+---------+---------+
|     Pressure|    Press|  Press_1|
|        Speed|   Speed_|     Rate|
+-------------+---------+---------+

假设我有一个包含这些列名称的数据的 DataFrame:

['Pressure', 'Rate', 'Altitude']

我想在我的参考 DataFrame 中查找这些变量名称中的每一个,如果存在则返回相应的 Standard_name,如果尚未在表中引用它,则保留原始变量。

因此,上述虚拟示例的预期结果应该是:

[Pressure, 'Speed', Altitude]

这在常规 Python Pandas 中很容易做到,但我不知道如何在 Spark 中做到这一点,因为你不应该考虑行索引。

非常感谢您的帮助。

【问题讨论】:

如果您的参考表不是那么大,最好collect它然后匹配名称。 【参考方案1】:

虽然我同意上面 mayank agrawal 的评论,但我试图仅通过转换来解决这个问题。

我改编this solution 以提取每个变体与大字典中的标准名称的所有成对对应关系。然后,我将字典映射到数据集标头到标准化标头的create a new column。

因此解决方案是:

from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from itertools import chain


key_value_map = F.udf(lambda maps: key:f[key] for f in maps for key in f,
    MapType(StringType(),StringType()))


map_df = variable_df
    .agg(F.collect_list(F.create_map(list(chain.from_iterable([[key, 'Standard'] for key in var_df.columns[2:]])))).alias('maps')) 
    .agg(F.collect_list(key_value_map('maps')))

result_dict = map_df.collect()  
ref_dict = result_dict[0][0][0]

corresp_df = header_df
    .withColumn('new_header', F.create_map([F.lit(x) for x in chain(*ref_dict.items())]).getItem(F.col('old_header')))    
    .withColumn("new_header", F.coalesce(F.col('new_header'), F.col('old_header')))

new_columns = corresp_df.select('new_header').rdd.flatMap(lambda row : row).collect()

renamed_df = data_df.toDF(*new_columns)

参考资料:

Dataframe pyspark to dict

pyspark create new column with mapping from a dict

【讨论】:

以上是关于跨 PySpark DataFrame 列的字符串匹配的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:转换DataFrame中给定列的值

计算 PySpark DataFrame 列的模式?

基于pyspark中仅一列的两个DataFrame之间的差异[重复]

Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”

如何将 PySpark Dataframe 列的类型指定为 JSON

PySpark Dataframe:将一个单词附加到列的每个值