如何访问 pyspark 中的“_corrupt_record”列?

Posted

技术标签:

【中文标题】如何访问 pyspark 中的“_corrupt_record”列?【英文标题】:How to access "_corrupt_record" column in pyspark? 【发布时间】:2021-07-08 02:18:37 【问题描述】:

我在处理不良记录和文件 (CSV) 时遇到了一些问题。 这是我的 CSV 文件

+------+---+---+----+
|  Name| ID|int|int2|
+------+---+---+----+
| Sohel|  1|  4|  33|
| Sohel|  2|  5|  56|
| Sohel|  3|  6| 576|
| Sohel|  a|  7| 567|
|Sohel2|  c|  7| 567|
+------+---+---+----+

我正在使用预定义架构读取此文件

schema = StructType([
  StructField("Name",StringType(),True),
  StructField("ID",IntegerType(),True),
  StructField("int",IntegerType(),True),
  StructField("int2",IntegerType(),True),
  StructField("_corrupt_record", StringType(),True) 
  ])
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema, 
columnNameOfCorruptRecord='_corrupt_record')

结果是

+------+----+---+----+---------------+
|  Name|  ID|int|int2|_corrupt_record|
+------+----+---+----+---------------+
| Sohel|   1|  4|  33|           null|
| Sohel|   2|  5|  56|           null|
| Sohel|   3|  6| 576|           null|
| Sohel|null|  7| 567|  Sohel,a,7,567|
|Sohel2|null|  7| 567| Sohel2,c,7,567|
+------+----+---+----+---------------+

它给了我预期的结果,但是问题从这里开始我只想访问那些“_corrupt_record”并制作一个新的df。 我确实在 df 中过滤了“_corrupt_record”,但它似乎原始 CSV 文件没有“_corrupt_record”列,这就是它给我错误的原因。

badRows = df.filter("_corrupt_record is Not Null").show()

错误消息

Error while reading file dbfs:/tmp/test_file/test_csv.csv.
Caused by: java.lang.IllegalArgumentException: _corrupt_record does not exist. Available: Name, ID, int, int2

我正在流动 Databricks 文档, https://docs.databricks.com/data/data-sources/read-csv.html#read-files ,但是他们也有同样的错误,为什么他们甚至将它添加到文档中!!

我只想访问“_corrupt_record”列并制作新的 DF。 任何帮助或建议将不胜感激。

【问题讨论】:

【参考方案1】:

您需要添加enforceSchema=True

df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema, 
enforceSchema=True, columnNameOfCorruptRecord='_corrupt_record')

这应该会给你损坏的记录列。

【讨论】:

你试过了吗?因为,我已经尝试过了,但它不起作用。我的问题也不是它,它给了我损坏的记录列,但我无法访问它。【参考方案2】:

试试这个,我可以看到 DF 已经创建了 -

df = df.filter(F.col("_corrupt_record").isNotNull())

【讨论】:

【参考方案3】:

问题在于您创建的数据框 df 不是 Delta Table

尝试将df的内容存入Delta Table:

%sql
CREATE TABLE IF NOT EXISTS df_delta_temp
 USING delta AS
   SELECT *
   FROM df;

如果现在您将查询您的增量表df_delta_temp,您的数据将会出现。

您的 Delta 表将在本地创建到您的工作区中(它将出现在左侧菜单的 Data 刀片中)。为了保持环境清洁,在详细说明结束时,您可以删除 Delta Table(这样它将用作临时表)。

%sql
DROP TABLE IF EXISTS df_delta_temp

我相信这篇有见地的文章会给你一些关于这个主题的有趣知识:https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1

【讨论】:

以上是关于如何访问 pyspark 中的“_corrupt_record”列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

我如何使用 s & $ 访问 Pyspark 中的变量,就像在 Scala 中一样

pyspark 列中的访问名称

如何在 pyspark 中处理 Glue 数据目录中的空表

无法访问 EMR 集群 jupyter notebook 中的 pyspark

如何访问安装在 hdfs 头节点集群内的 pyspark