从 Spark GroupedData 对象中选择随机项

Posted

技术标签:

【中文标题】从 Spark GroupedData 对象中选择随机项【英文标题】:Choosing random items from a Spark GroupedData Object 【发布时间】:2015-11-17 05:44:01 【问题描述】:

我刚开始在 Python 中使用 Spark,一直无法解决这个问题:在 pyspark.sql.dataframe.DataFrame 上运行 groupBy

df = sqlsc.read.json("data.json")
df.groupBy('teamId')

如何从每个结果组(按 teamId 分组)中选择 N 随机样本而不进行替换?

我基本上是在尝试从每个团队中随机选择N 用户,也许一开始使用groupBy 是错误的?

【问题讨论】:

【参考方案1】:

嗯,有点不对劲。 GroupedData 并不是真正为数据访问而设计的。它只是描述了分组标准并提供了聚合方法。有关详细信息,请参阅我对 Using groupBy in Spark and getting back to a DataFrame 的回复。

这个想法的另一个问题是选择N random samples。如果没有对数据进行心理分组,这是一项很难并行实现的任务,而且当您 call groupBy 在 DataFrame 上时不会发生这种情况:

至少有两种方法可以解决这个问题:

转换为 RDD,groupBy 并执行本地采样

import random

n = 3

def sample(iter, n): 
    rs = random.Random()  # We should probably use os.urandom as a seed
    return rs.sample(list(iter), n)    

df = sqlContext.createDataFrame(
    [(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"], 
    ("teamId", "x1", "x2"))

grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()

sampled = sqlContext.createDataFrame(
    grouped.flatMap(lambda kv: sample(kv[1], n)))

sampled.show()

## +------+---+-------------------+
## |teamId| x1|                 x2|
## +------+---+-------------------+
## |     1|  g|   0.81921738561455|
## |     1|  f| 0.8563875814036598|
## |     1|  a| 0.9010425238735935|
## |     2|  c| 0.3864428179837973|
## |     2|  g|0.06233470405822805|
## |     2|  d|0.37620872770129155|
## |     3|  f| 0.7518901502732027|
## |     3|  e| 0.5142305439671874|
## |     3|  d| 0.6250620479303716|
## +------+---+-------------------+

使用窗口函数

from pyspark.sql import Window
from pyspark.sql.functions import col, rand, rowNumber

w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))

sampled = (df
    .withColumn("rnd_", rand())  # Add random numbers column
    .withColumn("rn_", rowNumber().over(w))  # Add rowNumber over windw
    .where(col("rn_") <= n)  # Take n observations
    .drop("rn_")  # drop helper columns
    .drop("rnd_"))

sampled.show()

## +------+---+--------------------+
## |teamId| x1|                  x2|
## +------+---+--------------------+
## |     1|  f|  0.8563875814036598|
## |     1|  g|    0.81921738561455|
## |     1|  i|  0.8173912535268248|
## |     2|  h| 0.10862995810038856|
## |     2|  c|  0.3864428179837973|
## |     2|  a|  0.6695356657072442|
## |     3|  b|0.012329360826023095|
## |     3|  a|  0.6450777858109182|
## |     3|  e|  0.5142305439671874|
## +------+---+--------------------+

但恐怕两者都会相当昂贵。如果各个组的大小是平衡的并且相对较大,我会简单地使用DataFrame.randomSplit

如果组的数量相对较少,可以尝试其他方法:

from pyspark.sql.functions import count, udf
from pyspark.sql.types import BooleanType
from operator import truediv

counts = (df
    .groupBy(col("teamId"))
    .agg(count("*").alias("n"))
    .rdd.map(lambda r: (r.teamId, r.n))
    .collectAsMap()) 

# This defines fraction of observations from a group which should
# be taken to get n values 
counts_bd = sc.broadcast(k: truediv(n, v) for (k, v) in counts.items())

to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())

sampled = (df
    .withColumn("rnd_", rand())
    .where(to_take(col("teamId"), col("rnd_")))
    .drop("rnd_"))

sampled.show()

## +------+---+--------------------+
## |teamId| x1|                  x2|
## +------+---+--------------------+
## |     1|  d| 0.14815204548854788|
## |     1|  f|  0.8563875814036598|
## |     1|  g|    0.81921738561455|
## |     2|  a|  0.6695356657072442|
## |     2|  d| 0.37620872770129155|
## |     2|  g| 0.06233470405822805|
## |     3|  b|0.012329360826023095|
## |     3|  h|  0.9022527556458557|
## +------+---+--------------------+

在 Spark 1.5+ 中,您可以将 udf 替换为对 sampleBy 方法的调用:

df.sampleBy("teamId", counts_bd.value)

它不会为您提供确切的观察次数,但只要每组的观察次数足够大以获取适当的样本,大多数情况下就足够了。您也可以以类似的方式在 RDD 上使用sampleByKey

【讨论】:

如果您无法导入rowNumer,对我来说是row_number,可能是因为Pyspark 更新。【参考方案2】:

我发现这多了一个数据框,而不是进入 rdd 方式。

您可以使用window 函数在组内创建排名,其中排名可以是随机的以适合您的情况。然后,您可以根据每个组所需的样本数量(N) 进行过滤

window_1 = Window.partitionBy(data['teamId']).orderBy(F.rand())
data_1 = data.select('*', F.rank().over(window_1).alias('rank')).filter(F.col('rank') <= N).drop('rank')

【讨论】:

这很棒,工作完美。我很感激不需要为此恢复到 RDD。 非常好的和简洁的解决方案! +1

以上是关于从 Spark GroupedData 对象中选择随机项的主要内容,如果未能解决你的问题,请参考以下文章

GroupedData 的长度“‘GroupedData’类型的对象没有 len()”

将 pyspark groupedData 转换为 pandas DataFrame

Pyspark 2.4 中的 GroupedData 对象

使用无法直接从GroupedData类调用的方法聚合多个列(如“last()”)并将它们重命名为原始名称[duplicate]

以序列作为键参数的 Spark Dataframe groupBy [重复]

从 Spark 中的 DataFrame 中过滤和选择数据