如何访问 pyspark 中的“_corrupt_record”列?
Posted
技术标签:
【中文标题】如何访问 pyspark 中的“_corrupt_record”列?【英文标题】:How to access "_corrupt_record" column in pyspark? 【发布时间】:2021-07-08 02:18:37 【问题描述】:我在处理不良记录和文件 (CSV) 时遇到了一些问题。 这是我的 CSV 文件
+------+---+---+----+
| Name| ID|int|int2|
+------+---+---+----+
| Sohel| 1| 4| 33|
| Sohel| 2| 5| 56|
| Sohel| 3| 6| 576|
| Sohel| a| 7| 567|
|Sohel2| c| 7| 567|
+------+---+---+----+
我正在使用预定义架构读取此文件
schema = StructType([
StructField("Name",StringType(),True),
StructField("ID",IntegerType(),True),
StructField("int",IntegerType(),True),
StructField("int2",IntegerType(),True),
StructField("_corrupt_record", StringType(),True)
])
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema,
columnNameOfCorruptRecord='_corrupt_record')
结果是
+------+----+---+----+---------------+
| Name| ID|int|int2|_corrupt_record|
+------+----+---+----+---------------+
| Sohel| 1| 4| 33| null|
| Sohel| 2| 5| 56| null|
| Sohel| 3| 6| 576| null|
| Sohel|null| 7| 567| Sohel,a,7,567|
|Sohel2|null| 7| 567| Sohel2,c,7,567|
+------+----+---+----+---------------+
它给了我预期的结果,但是问题从这里开始我只想访问那些“_corrupt_record”并制作一个新的df。 我确实在 df 中过滤了“_corrupt_record”,但它似乎原始 CSV 文件没有“_corrupt_record”列,这就是它给我错误的原因。
badRows = df.filter("_corrupt_record is Not Null").show()
错误消息
Error while reading file dbfs:/tmp/test_file/test_csv.csv.
Caused by: java.lang.IllegalArgumentException: _corrupt_record does not exist. Available: Name, ID, int, int2
我正在流动 Databricks 文档, https://docs.databricks.com/data/data-sources/read-csv.html#read-files ,但是他们也有同样的错误,为什么他们甚至将它添加到文档中!!
我只想访问“_corrupt_record”列并制作新的 DF。 任何帮助或建议将不胜感激。
【问题讨论】:
【参考方案1】:您需要添加enforceSchema=True
。
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema,
enforceSchema=True, columnNameOfCorruptRecord='_corrupt_record')
这应该会给你损坏的记录列。
【讨论】:
你试过了吗?因为,我已经尝试过了,但它不起作用。我的问题也不是它,它给了我损坏的记录列,但我无法访问它。【参考方案2】:试试这个,我可以看到 DF 已经创建了 -
df = df.filter(F.col("_corrupt_record").isNotNull())
【讨论】:
【参考方案3】:问题在于您创建的数据框 df 不是 Delta Table。
尝试将df的内容存入Delta Table:
%sql
CREATE TABLE IF NOT EXISTS df_delta_temp
USING delta AS
SELECT *
FROM df;
如果现在您将查询您的增量表df_delta_temp,您的数据将会出现。
您的 Delta 表将在本地创建到您的工作区中(它将出现在左侧菜单的 Data 刀片中)。为了保持环境清洁,在详细说明结束时,您可以删除 Delta Table(这样它将用作临时表)。
%sql
DROP TABLE IF EXISTS df_delta_temp
我相信这篇有见地的文章会给你一些关于这个主题的有趣知识:https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1
【讨论】:
以上是关于如何访问 pyspark 中的“_corrupt_record”列?的主要内容,如果未能解决你的问题,请参考以下文章
如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?
我如何使用 s & $ 访问 Pyspark 中的变量,就像在 Scala 中一样