AWS Glue 输出文件名

Posted

技术标签:

【中文标题】AWS Glue 输出文件名【英文标题】:AWS Glue output file name 【发布时间】:2018-07-24 00:19:11 【问题描述】:

我正在使用 AWS 来转换一些 JSON 文件。我已将文件从 S3 添加到 Glue。我设置的作业读取文件正常,作业运行成功,有一个文件添加到正确的 S3 存储桶中。我遇到的问题是我无法命名文件 - 它被赋予了一个随机名称,它也没有被赋予 .JSON 扩展名。

如何命名文件并将扩展名添加到输出中?

【问题讨论】:

Write single CSV file using spark-csv的可能重复 不能重复,上面分享的链接是针对spark的,解决方案适用于aws glue。在 spark 中很难解决这个问题。 【参考方案1】:

由于 Spark 工作方式的性质,无法命名文件。但是,之后可以立即重命名文件。

URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("s3://bucket_name"), sc._jsc.hadoopConfiguration())

file_path = "s3://bucket_name/processed/source=source_name/year=partition_year/week=partition_week/"
df.coalesce(1).write.format("json").mode(
    "overwrite").option("codec", "gzip").save(file_path)

# rename created file
created_file_path = fs.globStatus(Path(file_path + "part*.gz"))[0].getPath()
fs.rename(
    created_file_path,
    Path(file_path + "desired_name.jl.gz"))

【讨论】:

一个非常好的解决方案,帮助我将一个 s3 文件重命名为我想要的任何名称,谢谢 :) 中包含的“变量”是否自动得到解决,还是我们需要替换?我可以用实际的 buckname 替换,我认为是 soruce,但要保持自动化,您将如何改变年/月/日?【参考方案2】:

以下代码对我有用 -

source_DataFrame = glueContext.create_dynamic_frame.from_catalog(database = databasename, table_name = source_tablename_in_catalog, transformation_ctx = "source_DataFrame")

source_DataFrame = source_DataFrame.toDF().coalesce(1) #avoiding coalesce(1) will create many part-000* files according to data

from awsglue.dynamicframe import DynamicFrame
DyF = DynamicFrame.fromDF(source_DataFrame, glueContext, "DyF")

# writing the file as usual in Glue. **I have given some partitions** too.
# keep "partitionKeys":[] in case of no partitions
output_Parquet = glueContext.write_dynamic_frame.from_options(frame = DyF, connection_type = "s3", format = "parquet", connection_options = "path": destination_path + "/", "partitionKeys": ["department","team","card","datepartition"], transformation_ctx = "output_Parquet")

import boto3
client = boto3.client('s3')

#getting all the content/file inside the bucket. 
response = client.list_objects_v2(Bucket=bucket_name)
names = response["Contents"]

#Find out the file which have part-000* in it's Key
particulars = [name['Key'] for name in names if 'part-000' in name['Key']]

#Find out the prefix of part-000* because we want to retain the partitions schema 
location = [particular.split('part-000')[0] for particular in particulars]

#Constrain - copy_object has limit of 5GB.datepartition=20190131
for key,particular in enumerate(particulars):
    client.copy_object(Bucket=bucket_name, CopySource=bucket_name + "/" + particular, Key=location[key]+"newfile")
    client.delete_object(Bucket=bucket_name, Key=particular)

job.commit()

基石是当文件(copy_object)大于 5GB 时,它会复制失败。 你可以用这个

s3 = boto3.resource('s3')
for key,particular in enumerate(particulars):
    copy_source = 
        'Bucket': bucket_name,
        'Key': particular
    
    s3.meta.client.copy(copy_source, bucket_name, location[key]+"newfile")

【讨论】:

以上是关于AWS Glue 输出文件名的主要内容,如果未能解决你的问题,请参考以下文章

在运行 AWS Glue ETL 作业并命名输出文件名时,有没有办法从 S3 存储桶中读取文件名。 pyspark 是不是提供了一种方法来做到这一点?

如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet

AWS Glue 检查文件内容的正确性

将 FASTQ 文件读入 AWS Glue 作业脚本

AWS Glue 数据目录的头文件

使用 Python 在 AWS Glue 中打开和读取文件