使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3
Posted
技术标签:
【中文标题】使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3【英文标题】:Write multiple rows at once to s3 in parquet format using sparkSession.createDataFrame 【发布时间】:2020-04-10 02:15:54 【问题描述】:我成功地创建了一个单帧/行并使用以下方法以镶木地板格式写入 s3
data_for_frame = ["Category": 'Category A', "ID": 1, "Value": Decimal(12.40),
"Category": 'Category B', "ID": 2, "Value": Decimal(30.10),
"Category": 'Category C', "ID": 3, "Value": Decimal(100.01)
]
dynamic_frame = sparkSession.createDataFrame(data_frame, schema_frame)
frame = sparkSession.createDataFrame(data_frame, schema_frame)
frame.write.parquet(s3_path)
如何一次写入多行以最小化每次写入?意味着我创建了多行并以某种方式将一个大帧传递给一次写入 s3
我试图将多个帧放入一个列表中,然后将其传递给frame.write
,但这不适用于错误AttributeError: 'list' object has no attribute 'write'
此外,如果我尝试使用单个帧在循环中写入 s3,则会出现错误
AnalysisException: 'path s3://***-test-bucket/testing/default_bucket/year=2020/month=4/day=4 already exists.;'
【问题讨论】:
【参考方案1】:在尝试写入循环时需要将mode
添加为append
。
directory
则默认 exists
会抛出错误
我们应该选择append
(将新文件添加到目录)(或)overwrite
(删除并重新创建目录)目录。
frame.write.mode("append").parquet(s3_path)
【讨论】:
解决了第二个问题,但仍会创建一个新文件 file 来追加,它不会写入相同或第一个 parquet 文件。以上是关于使用 sparkSession.createDataFrame 以 parquet 格式一次将多行写入 s3的主要内容,如果未能解决你的问题,请参考以下文章
在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?
Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)