如何从 PySpark DataFrame 中获取随机行?

Posted

技术标签:

【中文标题】如何从 PySpark DataFrame 中获取随机行?【英文标题】:How take a random row from a PySpark DataFrame? 【发布时间】:2016-03-04 08:48:29 【问题描述】:

我只看到方法sample() 以分数作为参数。将此分数设置为 1/numberOfRows 会导致随机结果,有时我不会得到任何行。

RDD 上有一个方法takeSample(),它将您希望样本包含的元素数量作为参数。我知道这可能会很慢,因为您必须计算每个分区,但是有没有办法在 DataFrame 上获得类似的东西?

【问题讨论】:

【参考方案1】:

您可以在RDD 上简单地拨打takeSample

df = sqlContext.createDataFrame(
    [(1, "a"), (2, "b"), (3, "c"), (4, "d")], ("k", "v"))
df.rdd.takeSample(False, 1, seed=0)
## [Row(k=3, v='c')]

如果你不想收集,你可以简单地采取更高的分数和限制:

df.sample(False, 0.1, seed=0).limit(1)

不要传递seed,每次都应该得到不同的DataFrame。

【讨论】:

有没有办法获取随机值。在上述情况下,每次运行查询时都会生成相同的数据帧。 不错的提示,@LateCoder! (在 Spark 2.3.1 上,保持 seed=None 似乎只适用于 df.rdd.takeSample,而不适用于 df.sample。) 为什么不想collect 哦,因为collect returns it to the driver program 它可能不适合驱动程序的内存。 我不认为第二个样本 -> 限制解决方案是非常随机的。 sample() 部分很好而且是随机的,但是在达到限制之前结果似乎有些排序。如果您使用 limit(10) 而不是 1 并且您的分数太大,这一点尤其明显。结果可能看起来相似。【参考方案2】:

不同类型的样本

随机抽样 % 的数据,无论是否替换

import pyspark.sql.functions as F
#Randomly sample 50% of the data without replacement
sample1 = df.sample(False, 0.5, seed=0)

#Randomly sample 50% of the data with replacement
sample1 = df.sample(True, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Anti Join
sample2 = df.join(sample1, on='ID', how='left_anti').sample(False, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Where
sample1_ids = [row['ID'] for row in sample1.ID]
sample2 = df.where(~F.col('ID').isin(sample1_ids)).sample(False, 0.5, seed=0)

#Generate a startfied sample of the data across column(s)
#Sampling is probabilistic and thus cannot guarantee an exact number of rows
fractions = 
        'NJ': 0.5, #Take about 50% of records where state = NJ
    'NY': 0.25, #Take about 25% of records where state = NY
    'VA': 0.1, #Take about 10% of records where state = VA

stratified_sample = df.sampleBy(F.col('state'), fractions, seed=0)

【讨论】:

【参考方案3】:

这是使用 Pandas DataFrame.Sample 方法的替代方法。这使用 spark applyInPandas 方法分发组,可从 Spark 3.0.0 获得。这允许您选择每组的确切行数。

我已将argskwargs 添加到函数中,以便您可以访问DataFrame.Sample 的其他参数。

def sample_n_per_group(n, *args, **kwargs):
    def sample_per_group(pdf):
        return pdf.sample(n, *args, **kwargs)
    return sample_per_group

df = spark.createDataFrame(
    [
        (1, 1.0), 
        (1, 2.0), 
        (2, 3.0), 
        (2, 5.0), 
        (2, 10.0)
    ],
    ("id", "v")
)

(df.groupBy("id")
   .applyInPandas(
        sample_n_per_group(1, random_state=2), 
        schema=df.schema
   )
)

要了解大型组的限制,来自documentation:

此功能需要完全随机播放。一个组的所有数据将是 加载到内存中,因此用户应注意潜在的 OOM 如果数据有偏差并且某些组太大而无法适应,则存在风险 记忆。

【讨论】:

以上是关于如何从 PySpark DataFrame 中获取随机行?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Pyspark Dataframe 中的字符串列中过滤字母值?

使用 Pyspark 从 REST API 获取数据到 Spark Dataframe

如何从数据框中获取 1000 条记录并使用 PySpark 写入文件?

从 pyspark 中的数据框数组类型列中获取“名称”元素

如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null

如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?