如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值

Posted

技术标签:

【中文标题】如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值【英文标题】:How to seperate CSV values with in a cell in pyspark dataframe to new columns and their values respectively 【发布时间】:2021-10-05 15:22:50 【问题描述】:

当前 spark 数据框在一列的单元格级别具有 CSV 值,我试图将其分解为新列。示例数据框

    a_id                                    features
1   2020     "a","b","c","d","constant1","1","0.1","aa"
2   2021     "a","b","c","d","constant2","1","0.2","ab"
3   2022     "a","b","c","d","constant3","1","0.3","ac","a","b","c","d","constant3","1.1","3.3","acx"
4   2023     "a","b","c","d","constant4","1","0.4","ad"
5   2024     "a","b","c","d","constant5","1","0.5","ae","a","b","c","d","constant5","1.2","6.3","xwy","a","b","c","d","constant5","2.2","8.3","bunr"
6   2025     "a","b","c","d","constant6","1","0.6","af"

features 列有多个 csv 值,其中 (a, b, c, d) 充当标题,它们在某些单元格中重复(在第 3 行和第 5 行),我只想提取一个标题及其各自的值。预期数据帧的输出如图所示

输出火花数据帧

    a_id       a        d
1   2020   constant1   ["aa"]
2   2021   constant2   ["ab"]
3   2022   constant3   ["ac","acx"]
4   2023   constant4   ["ad"]
5   2024   constant5   ["ae","xwy","bunr"]
6   2025   constant6   ["af"]

如图所示,我只想提取 a 和 d 标题作为新列,其中 a 是常量,d 有多个值,其中它的值作为列表。

请帮助如何在 pysaprk 中转换它。上述数据帧是实时流数据帧。

【问题讨论】:

你可以做一个用户定义的函数来解析行(每列一个),第一个函数通过检查“常量”字符串填充“a”列,第二个函数填充“d”列通过连接由 "," 分割的字符串的每个 %7 索引 谢谢,但如果你能在代码中展示如何使用“d”,我将不胜感激 【参考方案1】:

仅使用 Pyspark/Spark SQL 函数:

从字符串中删除标题 使用regexp_extract_all提取子字符串,在每四个,之后将字符串分成子字符串 explode结果并删除空行 split 又是结果。现在每个 csv 值都是数组的一个元素 从数组的第一个和第四个元素创建列 ad 分组a_id
from pyspark.sql import functions as F

header='"a","b","c","d",'
num_headers = header.count(",")

df.withColumn("features", F.expr(f"replace(features, 'header')")) \
  .withColumn("features", F.expr(f"regexp_extract_all(features, '(([^,]*,?)\\num_headers)')")) \
  .withColumn("features", F.explode("features"))\
  .filter("not features =''") \
  .withColumn("features", F.split("features", ",")) \
  .withColumn("a", F.expr("features[0]")) \
  .withColumn("d", F.expr("features[3]")) \
  .groupBy("a_id") \
  .agg(F.first("a").alias("a"), F.collect_list("d").alias("d")) \
  .show(truncate=False)

输出:

+----+----------+---------------------+
|a_id|a         |d                    |
+----+----------+---------------------+
|2020|"constant"|["aa"]               |
|2022|"constant"|["ac", "acx"]        |
|2025|"constant"|["af"]               |
|2023|"constant"|["ad"]               |
|2021|"constant"|["ab"]               |
|2024|"constant"|["ae", "xwy", "bunr"]|
+----+----------+---------------------+

【讨论】:

以上是关于如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值的主要内容,如果未能解决你的问题,请参考以下文章

如何从 pyspark 数据框中更快地保存 csv 文件?

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

删除 Spark 数据框中的空格时出错 - PySpark

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较

pyspark 使用模式将 csv 文件加载到数据框中