如何在限制行数的同时拆分 Pyspark 数据帧?
Posted
技术标签:
【中文标题】如何在限制行数的同时拆分 Pyspark 数据帧?【英文标题】:How to split a Pyspark dataframe while limiting the number of rows? 【发布时间】:2020-05-06 23:56:46 【问题描述】:我正在将数据从数据帧发送到限制为 50,000 行的 API。假设我的数据框有 70,000 行,我如何将其拆分为单独的数据框,每个数据框的最大行数为 50,000?这些不必是偶数,数据顺序无关紧要。
【问题讨论】:
你可以使用df.count
的条件,如果大于50k 使用randomSplit
函数。 spark.apache.org/docs/latest/api/python/…
类似def split(df): if df.count()>50000: df1,df2=df.randomSplit([0.5,0.5],24) return df1,df2 else: return df
乔希,你可能会找到答案here
@Josh 更好的解决方案是在数据帧上利用 foreachPartition
方法,通过这种方式,您可以控制每个分区的确切行数,并将数据直接发送到您的 API,如您之前在此处询问的那样***.com/questions/61645936/….
另一种解决方法是使用.limit()
函数。您可以执行以下操作:假设您的 70k 行主 df 是 original_df。因此,您可以第一次像 limited_df = df.limit(50000)
一样获得 50k 行,对于接下来的行,您可以像 original_df.subtract(limited_df)
一样获得剩余的行。如果需要,您甚至可以为减去的 df 执行 .limit()。
【参考方案1】:
解决方法是使用.limit()
函数。您可以执行以下操作:假设您的 70k 行主 df 是 original_df。所以你可以这样做
limited_df = df.limit(50000)
第一次获得 50k 行,接下来的行你可以做
original_df.subtract(limited_df)
你会得到剩下的行。如果需要,您甚至可以对减去的 df 执行 .limit()。
更新: 您可以对数据框中存在的任意数量的行执行此操作。假设您的数据帧有 30000 行,如果您执行了 df.limit(50000),它不会抛出任何错误,只会返回数据帧中存在的 30k 行。
【讨论】:
【参考方案2】:您可以通过使用row_number然后每50000行拆分来实现以下目的
#order by any column to populate the row number
window=Window.orderBy('ID')
length=df1.count()
df2=df1.withColumn('row',f.row_number().over(window))
step=50000
for i in range(1,length,step):
df3 = df2.filter((f.col('row')>=i) & (f.col('row')<=i+step-1))
#Here perform your API call as it will contain only 50000 rows at one time
【讨论】:
【参考方案3】:在@frosty 上添加了他的回答:
limited_df = df.limit(50000).cache()
rest_df = original_df.subtract(limited_df)
建议.cache()
保持一致性,因为没有它limited_df
和rest_df
可以有重叠的行。这种行为是由于 PySpark 多次运行 .limit()
(一次用于 limited_df
,一次用于 rest_df
)。
附注单独回答的原因:我还不能发表评论。
【讨论】:
以上是关于如何在限制行数的同时拆分 Pyspark 数据帧?的主要内容,如果未能解决你的问题,请参考以下文章