使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3

Posted

技术标签:

【中文标题】使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3【英文标题】:Write multiple rows at once to s3 in parquet format using sparkSession.createDataFrame 【发布时间】:2020-04-10 02:15:54 【问题描述】:

我成功地创建了一个单帧/行并使用以下方法以镶木地板格式写入 s3

data_for_frame = ["Category": 'Category A', "ID": 1, "Value": Decimal(12.40),
        "Category": 'Category B', "ID": 2, "Value": Decimal(30.10),
        "Category": 'Category C', "ID": 3, "Value": Decimal(100.01)
        ]


dynamic_frame = sparkSession.createDataFrame(data_frame, schema_frame)
frame = sparkSession.createDataFrame(data_frame, schema_frame)
frame.write.parquet(s3_path)

如何一次写入多行以最小化每次写入?意味着我创建了多行并以某种方式将一个大帧传递给一次写入 s3 我试图将多个帧放入一个列表中,然后将其传递给frame.write,但这不适用于错误AttributeError: 'list' object has no attribute 'write'

此外,如果我尝试使用单个帧在循环中写入 s3,则会出现错误

AnalysisException: 'path s3://***-test-bucket/testing/default_bucket/year=2020/month=4/day=4 already exists.;'

【问题讨论】:

【参考方案1】:

在尝试写入循环时需要将mode 添加为append

如果 spark 找到我们正在编写的 directory 则默认 exists 会抛出错误 我们应该选择append(将新文件添加到目录)(或)overwrite(删除并重新创建目录)目录。
frame.write.mode("append").parquet(s3_path)

【讨论】:

解决了第二个问题,但仍会创建一个新文件 file 来追加,它不会写入相同或第一个 parquet 文件。

以上是关于使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3的主要内容,如果未能解决你的问题,请参考以下文章

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份

Kettle java脚本组件的使用说明(简单使用升级使用)