AWS Glue 抓取动态 S3 路径位置

Posted

技术标签:

【中文标题】AWS Glue 抓取动态 S3 路径位置【英文标题】:AWS Glue Crawl Dynamic S3 Path Location 【发布时间】:2019-04-04 18:20:13 【问题描述】:

我正在 AWS Glue 中创建一个 ETL 作业,该作业将从 S3 位置提取存储库中每个实体的最新编辑或当前数据。存储库中的数据是实体所有编辑的历史记录。我每天运行 ETL,它会写入另一个 S3 位置,即 Bucket/path/to/files/current_date/...,其中当前日期是动态的并且与 ETL 运行的日期一致。

我遇到的问题是我无法以编程方式从 S3 中删除(组织限制),或者移动文件,因为这是副本并在幕后删除,所以它也失败了,留下一条单一路径供胶水爬行。我想设置爬虫,使路径的日期部分是动态的,但我无法找到一种方法来做到这一点——有人知道这是否可能吗?

我的数据按 run_date(请参阅上面的当前日期)以及其他 6 个分层分区进行分区。我正在通过 yaml 语言 CloudFormation 创建爬虫和 ETL 作业。爬虫的路径存储为 CloudFormation 脚本中定义的 ssm 参数。

路径 SSM 参数示例

S3CurrentPath:
    Type: AWS::SSM::Parameter
    Properties:
      Description: "Path in the S3 Lake where the current entity data is stored."
      Type: String
      Value: 'Data/Entities/Software/SoftwareCurrent'
      Name: "/org/member/local/s3/path/entityCurrent"

爬虫资源代码:

GenericCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Role: !Ref RoleNAme
      Name: !Sub "$ProfileName-crawler-$CrawlerName"
      Configuration: !Sub |
        
          "Version": 1.0,
          "CrawlerOutput": 
            "Partitions":  "AddOrUpdateBehavior": "InheritFromTable" ,
            "Tables":  "AddOrUpdateBehavior": "MergeNewColumns" 
          
        
      Description: !Ref CrawlerDescription
      DatabaseName: !Ref DatabaseName
      Targets:
        S3Targets:
          - Path: !Sub "s3://$S3DataBucket/$S3Path"

ETL DataSink 编写代码:

# Write the joined dynamic frame out to a datasink
        datasink = glueContext.write_dynamic_frame.from_options(
                frame = final_dynamic_frame, connection_type = "s3",
                connection_options = 
                    'path': 's3://lakeBucketName/lakePath/'.format(
                        lakeBucketName=args['lakeBucketName'],
                        lakePath=args['lakeDestinationPath']),
                        "partitionKeys": ['run_date','location','year','month','day','hour','timestamp'],
                format = "parquet",
                transformation_ctx = "datasink")

我希望爬虫会查看存储库中的最新日期,即最新的 run_date 分区“文件夹”,并在不回顾旧数据的情况下对其进行爬取。

如果您想查看更多代码,请告诉我——我很乐意清理并提供。

【问题讨论】:

【参考方案1】:

说实话,我还没有找到使用 AWS Glue 将数据读/写到动态路径的方法。我通常做的是使用 PySpark 方法读/写:

datasink.write.\
        format("com.databricks.spark.csv").\
        option("header", "true").\
        mode("overwrite").\
        save("s3://my-bucket/files/" + current_date + "*.csv")

您甚至可以告诉该方法只是读取/写入特定类型的文件(例如 .csv)。 PySpark 比 AWS Glue 具有更多选项和可用方法,因此具有更大的灵活性。另外,我在 DynamoDB 表中添加了一个键/值记录来保存最新日期的记录。

【讨论】:

谢谢,Aida——我要试一试,看看它是否适合我的需要。如果是这样,我会回来并选择这个作为答案。谢谢! 恒星——这很顺利,艾达!我现在只需要看看我是否可以使用 datasink.write 方法创建分区。

以上是关于AWS Glue 抓取动态 S3 路径位置的主要内容,如果未能解决你的问题,请参考以下文章

使用 AWS Glue 从 S3 读取动态 DataTpes

AWS Glue-如何在 S3 中将动态帧编写为 .txt 文件并使用“|”作为分隔符

如何在 ETL 处理之前检查 AWS Glue 架构?

AWS Glue 不检测分区并在目录中创建 1000 多个表

AWS Glue ETL 到 Redshift:日期

AWS Glue 检查文件内容的正确性