如何在限制行数的同时拆分 Pyspark 数据帧?

Posted

技术标签:

【中文标题】如何在限制行数的同时拆分 Pyspark 数据帧?【英文标题】:How to split a Pyspark dataframe while limiting the number of rows? 【发布时间】:2020-05-06 23:56:46 【问题描述】:

我正在将数据从数据帧发送到限制为 50,000 行的 API。假设我的数据框有 70,000 行,我如何将其拆分为单独的数据框,每个数据框的最大行数为 50,000?这些不必是偶数,数据顺序无关紧要。

【问题讨论】:

你可以使用df.count 的条件,如果大于50k 使用randomSplit 函数。 spark.apache.org/docs/latest/api/python/… 类似def split(df): if df.count()>50000: df1,df2=df.randomSplit([0.5,0.5],24) return df1,df2 else: return df 乔希,你可能会找到答案here @Josh 更好的解决方案是在数据帧上利用 foreachPartition 方法,通过这种方式,您可以控制每个分区的确切行数,并将数据直接发送到您的 API,如您之前在此处询问的那样***.com/questions/61645936/…. 另一种解决方法是使用.limit() 函数。您可以执行以下操作:假设您的 70k 行主 df 是 original_df。因此,您可以第一次像 limited_df = df.limit(50000) 一样获得 50k 行,对于接下来的行,您可以像 original_df.subtract(limited_df) 一样获得剩余的行。如果需要,您甚至可以为减去的 df 执行 .limit()。 【参考方案1】:

解决方法是使用.limit() 函数。您可以执行以下操作:假设您的 70k 行主 df 是 original_df。所以你可以这样做

limited_df = df.limit(50000)

第一次获得 50k 行,接下来的行你可以做

original_df.subtract(limited_df)

你会得到剩下的行。如果需要,您甚至可以对减去的 df 执行 .limit()。

更新: 您可以对数据框中存在的任意数量的行执行此操作。假设您的数据帧有 30000 行,如果您执行了 df.limit(50000),它不会抛出任何错误,只会返回数据帧中存在的 30k 行。

【讨论】:

【参考方案2】:

您可以通过使用row_number然后每50000行拆分来实现以下目的

#order by any column to populate the row number
window=Window.orderBy('ID')
length=df1.count()
df2=df1.withColumn('row',f.row_number().over(window))

step=50000
for i in range(1,length,step):
    df3 = df2.filter((f.col('row')>=i) & (f.col('row')<=i+step-1))
    #Here perform your API call as it will contain only 50000 rows at one time 

【讨论】:

【参考方案3】:

在@frosty 上添加了他的回答:

limited_df = df.limit(50000).cache()
rest_df = original_df.subtract(limited_df)

建议.cache() 保持一致性,因为没有它limited_dfrest_df 可以有重叠的行。这种行为是由于 PySpark 多次运行 .limit()(一次用于 limited_df,一次用于 rest_df)。

附注单独回答的原因:我还不能发表评论。

【讨论】:

以上是关于如何在限制行数的同时拆分 Pyspark 数据帧?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 按列将数据帧拆分为几个数据帧

PySpark:计算列子集的最大行数并添加到现有数据帧

如何有效地计算数据帧的行数? [复制]

Pyspark - 如何将多个数据帧的列连接成一个数据帧的列

Pandas:按行数将数据帧拆分为多个数据帧

Pyspark 数据帧拆分并将分隔列值填充到 N 索引数组中