通过火花数据框读取 S3 文件时,胶水书签不起作用

Posted

技术标签:

【中文标题】通过火花数据框读取 S3 文件时,胶水书签不起作用【英文标题】:Glue bookmark is not working when reading S3 files via spark dataframe 【发布时间】:2020-12-30 04:56:23 【问题描述】:

我有一个存储 .gz 文件(json 格式)的 S3 存储桶。每小时会有更多文件发送到此存储桶。我想使用 Glue 以增量方式(每天)从 S3 存储桶中读取数据,将 .gz 转换为 parquet 并写回另一个 S3 存储桶。

我想我可以使用 Glue 书签来读取/转换/写入增量文件。但是,我发现如果我阅读 spark 数据框中的 .gz 文件,则书签不起作用。换句话说,我下面的胶水作业不会增量读取文件。它从该存储桶中读取所有文件。我确实在那个 Glue 作业中启用了书签。

我在这里错过了什么吗?我需要通过胶水动态数据框而不是火花数据框来读取文件吗?

我实际上不知道如何通过胶水动态数据框正确读取 .gz 文件。根据我的理解,要使用 Glue 动态数据框,我需要针对该存储桶创建一个 Glue 爬虫来创建一个表。但是,该存储桶中有很多文件夹,每个日期名称都是文件夹名称。当我尝试在该存储桶上创建爬虫时,它在 Glue 中创建了一大堆表......

请,任何建议或意见将不胜感激!

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as sf
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import IntegerType, TimestampType, LongType
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

srcDf = spark.read.json("s3://source/*/*")

partitionDf = srcDf.withColumn("date_col", to_date(col("timestamp"), 'yyyy-MM-dd')).withColumn("year", year(col("date_col"))).withColumn("month", month(col("date_col"))).withColumn("day", dayofmonth(col("date_col"))).repartition(1)

dynamicdf = DynamicFrame.fromDF(partitionDf, glueContext, "test_nest")

apilogs = glueContext.write_dynamic_frame.from_options(frame = dynamicdf, connection_type = "s3", connection_options = "path": "s3://destination/", "partitionKeys": ["year", "month", "day"], format = "glueparquet", transformation_ctx = "apilogs")


job.commit()

提前谢谢你!

【问题讨论】:

【参考方案1】:

书签是一种特定于胶水的功能,不适用于 spark 数据帧。您需要将其作为动态框架读取,以便跟踪先前处理的文件。拥有job.init()transformation_ctxjob.commit() 很重要。上面有更详细的文档here。

为了读取压缩后的 json 文件,您可以使用以下命令:

dyf = glueContext.create_dynamic_frame.from_options(
                connection_type = "s3",
                connection_options = "paths": [your_s3_bucket], 'compression': 'gzip' ,
                format = "json", transformation_ctx = "dyf"))

【讨论】:

感谢 Eman,我试了一下,似乎动态框架没有从我的 .gz 文件中读取任何内容。 dyf = glueContext.create_dynamic_frame_from_options(connection_type ="s3",connection_options = "paths": ["s3://test/*/*"], "compression": "gzip" ,format = "json", transformation_ctx ="dyf") 不使用 * 可以尝试使用递归选项,如下所示:dyf = glueContext.create_dynamic_frame_from_options(connection_type ="s3",connection_options = "paths": ["s3://test/"], "compression": "gzip", "recurse": True ,format = "json", transformation_ctx ="dyf")

以上是关于通过火花数据框读取 S3 文件时,胶水书签不起作用的主要内容,如果未能解决你的问题,请参考以下文章

当明确给出 s3 路径时,模式合并不起作用

胶水作业无法写入文件

NoClassDefFoundError:org/apache/hadoop/fs/StreamCapabilities,同时用火花读取s3数据

无法从镶木地板中读取零件文件

无法将 aws 胶水动态帧转换为火花数据帧

读入火花数据框时如何从csv文件中删除列