AWS Glue PySpark 替换 NULL
Posted
技术标签:
【中文标题】AWS Glue PySpark 替换 NULL【英文标题】:AWS Glue PySpark replace NULLs 【发布时间】:2017-12-20 23:25:23 【问题描述】:我正在运行 AWS Glue 作业,以使用 Glue 自动生成的 PySpark 脚本将 S3 上的管道分隔文件加载到 RDS Postgres 实例中。
最初,它抱怨某些列中有 NULL 值:
pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null"
在对 SO 进行一些谷歌搜索和阅读之后,我尝试通过将我的 AWS Glue 动态数据帧转换为 Spark 数据帧、执行函数 fillna() 并重新转换回动态数据框。
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"xyz_catalog", table_name = "xyz_staging_files", transformation_ctx =
"datasource0")
custom_df = datasource0.toDF()
custom_df2 = custom_df.fillna(-1)
custom_df3 = custom_df2.fromDF()
applymapping1 = ApplyMapping.apply(frame = custom_df3, mappings = [("id",
"string", "id", "int"),........more code
参考资料:
https://github.com/awslabs/aws-glue-samples/blob/master/FAQ_and_How_to.md#3-there-are-some-transforms-that-i-cannot-figure-out
How to replace all Null values of a dataframe in Pyspark
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna
现在,当我运行我的工作时,它会引发以下错误:
Log Contents:
Traceback (most recent call last):
File "script_2017-12-20-22-02-13.py", line 23, in <module>
custom_df3 = custom_df2.fromDF()
AttributeError: 'DataFrame' object has no attribute 'fromDF'
End of LogType:stdout
我是 Python 和 Spark 的新手,已经尝试了很多,但无法理解这一点。感谢专家对此的帮助。
我尝试将我的 reconvert 命令更改为:
custom_df3 = glueContext.create_dynamic_frame.fromDF(frame = custom_df2)
但还是报错:
AttributeError: 'DynamicFrameReader' object has no attribute 'fromDF'
更新: 我怀疑这与 NULL 值无关。消息“Can't get JDBC type for null”似乎不是指 NULL 值,而是 JDBC 无法破译的某些数据/类型。
我创建了一个只有 1 条记录的文件,没有 NULL 值,将所有布尔类型更改为 INT(并将值替换为 0 和 1),但仍然得到相同的错误:
pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null"
更新: 确保已导入 DynamicFrame(从 awsglue.context import DynamicFrame),因为 fromDF / toDF 是 DynamicFrame 的一部分。
参考https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html
【问题讨论】:
会不会是列数据类型的原因?我记得像fillna()
和dropna()
这样的东西默认情况下只影响具有string
数据类型的列。例如,如果列类型是date
,它仍然可以在fillna()
和之后包含空值dropna()
您是否尝试过使用DropNullFields?它从 DynamicFrame 中删除空字段。输出 DynamicFrame 在架构中不包含 null 类型的字段。这样您就可以只使用针对 Glue 作业进行了微调的动态帧,避免与数据帧相互转换。
【参考方案1】:
您在错误的类上调用 .fromDF。它应该是这样的:
from awsglue.dynamicframe import DynamicFrame
DyamicFrame.fromDF(custom_df2, glueContext, 'label')
【讨论】:
【参考方案2】:对于这个错误,pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null"
您应该使用删除 Null 列。
我在加载到 Redshift 数据库表时遇到了类似的错误。使用以下命令后,问题得到解决
Loading= DropNullFields.apply(frame = resolvechoice3, transformation_ctx = "Loading")
【讨论】:
【参考方案3】:在 Pandas 中,对于 Pandas DataFrame
,pd.fillna()
用于用其他指定值填充 null
值。但是,DropNullFields
会删除类型为NullType
的DynamicFrame
中的所有空字段。这些字段在DynamicFrame
数据集中的每条记录中都有缺失值或null
值。
在您的具体情况下,您需要确保您正在使用写入 class
来获取适当的数据集。
这是您的代码的编辑版本:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"xyz_catalog", table_name = "xyz_staging_files", transformation_ctx =
"datasource0")
custom_df = datasource0.toDF()
custom_df2 = custom_df.fillna(-1)
custom_df3 = DyamicFrame.fromDF(custom_df2, glueContext, 'your_label')
applymapping1 = ApplyMapping.apply(frame = custom_df3, mappings = [("id",
"string", "id", "int"),........more code
这就是您正在执行的操作:1. 读取DynamicFrame
中的文件,2. 将其转换为DataFrame
,3. 删除null
值,4. 转换回DynamicFrame
,以及5. @ 987654335@。您收到以下错误,因为您的第 4 步错误,并且您将 DataFrame
提供给 ApplyMapping
,这不起作用。 ApplyMapping
专为 DynamicFrame
s 设计。
我建议阅读DynamicFrame
中的数据并坚持使用相同的数据类型。它看起来像这样(一种方法):
from awsglue.dynamicframe import DynamicFrame
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"xyz_catalog", table_name = "xyz_staging_files", transformation_ctx =
"datasource0")
custom_df = DropNullFields.apply(frame=datasource0)
applymapping1 = ApplyMapping.apply(frame = custom_df, mappings = [("id",
"string", "id", "int"),........more code
【讨论】:
以上是关于AWS Glue PySpark 替换 NULL的主要内容,如果未能解决你的问题,请参考以下文章
从 AWS Glue/PySpark 中的 100 个表中选择数据
如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?