Pyspark DataFrame - 转义 &
Posted
技术标签:
【中文标题】Pyspark DataFrame - 转义 &【英文标题】:Pyspark DataFrame - Escaping & 【发布时间】:2021-08-23 12:20:56 【问题描述】:我有一些使用分号作为分隔符的大型 (~150 GB) csv 文件。我发现某些字段包含一个 html 编码的 & 符号 &
分号被用作列分隔符,因此我需要在加载数据帧时将其转义或将 &
替换为 &
。
例如,我有以下 csv 文件:
ID;FirstName;LastName
1;Chandler;Bing
2;Ross & Monica;Geller
我使用以下笔记本加载它:
df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test.csv')
df.show()
我得到的结果是:
+---+---------+--------+
| ID|FirstName|LastName|
+---+---------+--------+
| 1| Chandler| Bing|
| 2|Ross &| Monica|
+---+---------+--------+
而我正在寻找的是:
+---+-------------+--------+
| ID| FirstName|LastName|
+---+-------------+--------+
| 1| Chandler| Bing|
| 2|Ross & Monica| Geller|
+---+-------------+--------+
我尝试过使用.option("escape", "&")
,但转义只适用于单个字符。
更新
我有一个使用 RDD 的 hacky 解决方法,它至少适用于小型测试文件,但我仍在寻找合适的解决方案,在加载数据帧时转义字符串。
rdd = sc.textFile('/mnt/input/AMP test.csv')
rdd = rdd.map(lambda x: x.replace('&', '&'))
rdd.coalesce(1).saveAsTextFile("/mnt/input/AMP test escaped.csv")
df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test escaped.csv')
df.show()
【问题讨论】:
【参考方案1】:您可以直接使用数据框执行此操作。如果您知道至少有 1 个不包含任何 &
的文件来检索架构,这会有所帮助。
假设存在这样一个文件,并且它的路径是“valid.csv”。
from pyspark.sql import functions as F
# I acquire a valid file without the & wrong data to get a nice schema
schm = spark.read.csv("valid.csv", header=True, inferSchema=True, sep=";").schema
df = spark.read.text("/mnt/input/AMP test.csv")
# I assume you have several files, so I remove all the headers.
# I do not need them as I already have my schema in schm.
header = df.first().value
df = df.where(F.col("value") != header)
# I replace "&" with "&", and split the column
df = df.withColumn(
"value", F.regexp_replace(F.col("value"), "&", "&")
).withColumn(
"value", F.split("value", ";")
)
# I explode the array in several columns and add types based on schm defined previously
df = df.select(
*(
F.col("value").getItem(i).cast(col.dataType).alias(col.name)
for i, col in enumerate(schm)
)
)
结果如下:
df.show()
+---+-------------+--------+
| ID| FirstName|LastName|
+---+-------------+--------+
| 1| Chandler| Bing|
| 2|Ross & Monica| Geller|
+---+-------------+--------+
df.printSchema()
root
|-- ID: integer (nullable = true)
|-- FirstName: string (nullable = true)
|-- LastName: string (nullable = true)
【讨论】:
【参考方案2】:我认为没有办法仅使用spark.read.csv
来逃避这个复杂的字符&
,解决方案就像你做了你的“解决方法”:
rdd.map
:此函数已将所有列中的值&
替换为&
不需要将你的rdd保存在临时路径中,只需将其作为csv
参数传递即可:
rdd = sc.textFile("your_path").map(lambda x: x.replace("&", "&"))
df = spark.read.csv(rdd, header=True, sep=";")
df.show()
+---+-------------+--------+
| ID| FirstName|LastName|
+---+-------------+--------+
| 1| Chandler| Bing|
| 2|Ross & Monica| Geller|
+---+-------------+--------+
【讨论】:
以上是关于Pyspark DataFrame - 转义 &的主要内容,如果未能解决你的问题,请参考以下文章
来自Python Dictionary的PySpark Dataframe没有Pandas
Pyspark Dataframe 上的 Pivot String 列