如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?
Posted
技术标签:
【中文标题】如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?【英文标题】:How to get 1000 records from dataframe and write into a file using PySpark? 【发布时间】:2020-04-24 15:48:42 【问题描述】:我在数据框中有 100,000 多条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,在此先感谢。
【问题讨论】:
【参考方案1】:您可以在编写dataframe
时使用maxRecordsPerFile
选项。
repartition(1)
(or)
为每个分区写入 1000 条记录使用 .coalesce(1)
Example:
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()
df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)
Method-2:
Caluculating number of partitions then repartition the dataframe:
df = spark.range(10000)
#caluculate partitions
no_partitions=df.count()/1000
from pyspark.sql.functions import *
#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()
#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#| 1| 1001|
#| 6| 1000|
#| 3| 999|
#| 5| 1000|
#| 9| 1000|
#| 4| 999|
#| 8| 1000|
#| 7| 1000|
#| 2| 1001|
#| 0| 1000|
#+-----------+--------+
df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)
【讨论】:
我怎样才能给自定义文件名而不是自动生成? @tsuresh97,默认情况下我们不能在 spark 中使用control filenames
,但只能指定 directory 名称,要更改文件名需要使用 hadoop.fs api 对于这种情况***.com/questions/41990086/…
非常感谢@Shu
我需要将数据帧的记录写入 json 文件。如果我将数据帧写入它存储的文件中,例如 "a":1 "b":2,我想像这样 ["a":1 ,"b":2 写入数据帧]。可以请@Shu。提前致谢。
@tsuresh97,请查看我的回答***.com/a/61425542/7632695【参考方案2】:
首先,创建一个行号列
df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))
现在,运行一个循环并继续保存记录。
for i in range(0, df.count(), 1000):
records = df.where(F.col("row_num").between(i, i+999))
records.toPandas().to_csv("file-.csv".format(i))
【讨论】:
我需要将数据帧的记录写入 json 文件。如果我将数据帧写入它存储的文件中,例如 "a":1 "b":2,我想像这样 ["a":1 ,"b":2 写入数据帧]。你能帮我么。提前致谢。 @Prateek 耆那教 使用 pandas 我们也可以存储为 Json 格式,这正是您所需要的。因此,只需将 .to_csv 更改为 .to_json。有关详细信息,请参阅此链接 - pandas.pydata.org/pandas-docs/stable/reference/api/… 感谢@Prateek Jain以上是关于如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?的主要内容,如果未能解决你的问题,请参考以下文章
如何获取数据列表,特定日期仅从每个日期获取 6 条记录而不是更多