跨 PySpark DataFrame 列的字符串匹配
Posted
技术标签:
【中文标题】跨 PySpark DataFrame 列的字符串匹配【英文标题】:String matching across PySpark DataFrame columns 【发布时间】:2018-09-29 17:42:32 【问题描述】:在给定参考表的情况下,我寻求标准化我的 DataFrame 的标题名称。
我的参考表是一个 DataFrame,其中包含行中的变量,标准名称和所有可能的变体名称为列:
+-------------+---------+---------+
|Standard_name|Variant_1|Variant_2|
+-------------+---------+---------+
| Pressure| Press| Press_1|
| Speed| Speed_| Rate|
+-------------+---------+---------+
假设我有一个包含这些列名称的数据的 DataFrame:
['Pressure', 'Rate', 'Altitude']
我想在我的参考 DataFrame 中查找这些变量名称中的每一个,如果存在则返回相应的 Standard_name,如果尚未在表中引用它,则保留原始变量。
因此,上述虚拟示例的预期结果应该是:
[Pressure, 'Speed', Altitude]
这在常规 Python Pandas 中很容易做到,但我不知道如何在 Spark 中做到这一点,因为你不应该考虑行索引。
非常感谢您的帮助。
【问题讨论】:
如果您的参考表不是那么大,最好collect
它然后匹配名称。
【参考方案1】:
虽然我同意上面 mayank agrawal 的评论,但我试图仅通过转换来解决这个问题。
我改编this solution 以提取每个变体与大字典中的标准名称的所有成对对应关系。然后,我将字典映射到数据集标头到标准化标头的create a new column。
因此解决方案是:
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from itertools import chain
key_value_map = F.udf(lambda maps: key:f[key] for f in maps for key in f,
MapType(StringType(),StringType()))
map_df = variable_df
.agg(F.collect_list(F.create_map(list(chain.from_iterable([[key, 'Standard'] for key in var_df.columns[2:]])))).alias('maps'))
.agg(F.collect_list(key_value_map('maps')))
result_dict = map_df.collect()
ref_dict = result_dict[0][0][0]
corresp_df = header_df
.withColumn('new_header', F.create_map([F.lit(x) for x in chain(*ref_dict.items())]).getItem(F.col('old_header')))
.withColumn("new_header", F.coalesce(F.col('new_header'), F.col('old_header')))
new_columns = corresp_df.select('new_header').rdd.flatMap(lambda row : row).collect()
renamed_df = data_df.toDF(*new_columns)
参考资料:
Dataframe pyspark to dict
pyspark create new column with mapping from a dict
【讨论】:
以上是关于跨 PySpark DataFrame 列的字符串匹配的主要内容,如果未能解决你的问题,请参考以下文章
基于pyspark中仅一列的两个DataFrame之间的差异[重复]
Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”