Pyspark DataFrame - 转义 &

Posted

技术标签:

【中文标题】Pyspark DataFrame - 转义 &【英文标题】:Pyspark DataFrame - Escaping & 【发布时间】:2021-08-23 12:20:56 【问题描述】:

我有一些使用分号作为分隔符的大型 (~150 GB) csv 文件。我发现某些字段包含一个 html 编码的 & 符号 & 分号被用作列分隔符,因此我需要在加载数据帧时将其转义或将 & 替换为 &

例如,我有以下 csv 文件:

ID;FirstName;LastName
1;Chandler;Bing
2;Ross & Monica;Geller

我使用以下笔记本加载它:

df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test.csv')
df.show()

我得到的结果是:

+---+---------+--------+
| ID|FirstName|LastName|
+---+---------+--------+
|  1| Chandler|    Bing|
|  2|Ross &amp|  Monica|
+---+---------+--------+

而我正在寻找的是:

+---+-------------+--------+
| ID|    FirstName|LastName|
+---+-------------+--------+
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+---+-------------+--------+

我尝试过使用.option("escape", "&"),但转义只适用于单个字符。

更新

我有一个使用 RDD 的 hacky 解决方法,它至少适用于小型测试文件,但我仍在寻找合适的解决方案,在加载数据帧时转义字符串。

rdd = sc.textFile('/mnt/input/AMP test.csv')
rdd = rdd.map(lambda x: x.replace('&', '&'))

rdd.coalesce(1).saveAsTextFile("/mnt/input/AMP test escaped.csv")

df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test escaped.csv')
df.show()

【问题讨论】:

【参考方案1】:

您可以直接使用数据框执行此操作。如果您知道至少有 1 个不包含任何 & 的文件来检索架构,这会有所帮助。

假设存在这样一个文件,并且它的路径是“valid.csv”。

from pyspark.sql import functions as F

# I acquire a valid file without the & wrong data to get a nice schema
schm = spark.read.csv("valid.csv", header=True, inferSchema=True, sep=";").schema


df = spark.read.text("/mnt/input/AMP test.csv")

# I assume you have several files, so I remove all the headers.
# I do not need them as I already have my schema in schm.
header = df.first().value
df = df.where(F.col("value") != header)


# I replace "&" with "&", and split the column
df = df.withColumn(
    "value", F.regexp_replace(F.col("value"), "&", "&")
).withColumn(
    "value", F.split("value", ";")
)

# I explode the array in several columns and add types based on schm defined previously
df = df.select(
    *(
        F.col("value").getItem(i).cast(col.dataType).alias(col.name)
        for i, col in enumerate(schm)
    )
)

结果如下:

df.show()
+---+-------------+--------+
| ID|    FirstName|LastName|
+---+-------------+--------+
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+---+-------------+--------+

df.printSchema()
root
 |-- ID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)

【讨论】:

【参考方案2】:

我认为没有办法仅使用spark.read.csv 来逃避这个复杂的字符&,解决方案就像你做了你的“解决方法”:

rdd.map:此函数已将所有列中的值& 替换为& 不需要将你的rdd保存在临时路径中,只需将其作为csv参数传递即可:
rdd = sc.textFile("your_path").map(lambda x: x.replace("&", "&"))

df = spark.read.csv(rdd, header=True, sep=";")
df.show()

+---+-------------+--------+
| ID|    FirstName|LastName|
+---+-------------+--------+
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+---+-------------+--------+

【讨论】:

以上是关于Pyspark DataFrame - 转义 &的主要内容,如果未能解决你的问题,请参考以下文章

来自Python Dictionary的PySpark Dataframe没有Pandas

PySpark DataFrame的逐行聚合

Pyspark Dataframe 上的 Pivot String 列

PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值

使用 PySpark 删除 Dataframe 的嵌套列

PySpark - 如何使用连接更新 Dataframe?