如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?

Posted

技术标签:

【中文标题】如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?【英文标题】:How to get 1000 records from dataframe and write into a file using PySpark? 【发布时间】:2020-04-24 15:48:42 【问题描述】:

我在数据框中有 100,000 多条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,在此先感谢。

【问题讨论】:

【参考方案1】:

您可以在编写dataframe 时使用maxRecordsPerFile 选项。

如果您需要整个数据帧在每个文件中写入 1000 条记录,请使用repartition(1) (or)每个分区写入 1000 条记录使用 .coalesce(1)

Example:

# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()

df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)

Method-2:

Caluculating number of partitions then repartition the dataframe:

df = spark.range(10000)

#caluculate partitions
no_partitions=df.count()/1000

from pyspark.sql.functions import *

#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()

#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#|          1|    1001|
#|          6|    1000|
#|          3|     999|
#|          5|    1000|
#|          9|    1000|
#|          4|     999|
#|          8|    1000|
#|          7|    1000|
#|          2|    1001|
#|          0|    1000|
#+-----------+--------+

df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)

【讨论】:

我怎样才能给自定义文件名而不是自动生成? @tsuresh97,默认情况下我们不能在 spark 中使用control filenames,但只能指定 directory 名称,要更改文件名需要使用 hadoop.fs api 对于这种情况***.com/questions/41990086/… 非常感谢@Shu 我需要将数据帧的记录写入 json 文件。如果我将数据帧写入它存储的文件中,例如 "a":1 "b":2,我想像这样 ["a":1 ,"b":2 写入数据帧]。可以请@Shu。提前致谢。 @tsuresh97,请查看我的回答***.com/a/61425542/7632695【参考方案2】:

首先,创建一个行号列

df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))

现在,运行一个循环并继续保存记录。

for i in range(0, df.count(), 1000):
   records = df.where(F.col("row_num").between(i, i+999))
   records.toPandas().to_csv("file-.csv".format(i))

【讨论】:

我需要将数据帧的记录写入 json 文件。如果我将数据帧写入它存储的文件中,例如 "a":1 "b":2,我想像这样 ["a":1 ,"b":2 写入数据帧]。你能帮我么。提前致谢。 @Prateek 耆那教 使用 pandas 我们也可以存储为 Json 格式,这正是您所需要的。因此,只需将 .to_csv 更改为 .to_json。有关详细信息,请参阅此链接 - pandas.pydata.org/pandas-docs/stable/reference/api/… 感谢@Prateek Jain

以上是关于如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何获取超过1000?

如何从雪花数据库中的表中删除前 N 条记录

如何获取数据列表,特定日期仅从每个日期获取 6 条记录而不是更多

如何使用教义/symfony4 从数据库中获取(连接)两条记录

如何使用 C#/SQL 批量更新 1000 条记录

如何从表中逐块获取数据?