PySpark,DataFrame 的顶部

Posted

技术标签:

【中文标题】PySpark,DataFrame 的顶部【英文标题】:PySpark, top for DataFrame 【发布时间】:2017-09-01 21:15:16 【问题描述】:

我想要做的是给定一个 DataFrame,根据某个指定的列取前 n 个元素。 (自拍,NUM)在RDD API正是我想要的。我不知道是否有相当的API在数据帧的世界? P>

我的第一次尝试是以下 P>

def retrieve_top_n(df, n):
    # assume we want to get most popular n 'key' in DataFrame
    return df.groupBy('key').count().orderBy('count', ascending=False).limit(n).select('key')

但是,我已经意识到这会导致不确定的行为(我不知道确切原因,但我猜 limit(n) 并不能保证取哪个 n)

【问题讨论】:

【参考方案1】:

首先我们定义一个生成测试数据的函数:

import numpy as np

def sample_df(num_records):
    def data():
      np.random.seed(42)
      while True:
          yield int(np.random.normal(100., 80.))

    data_iter = iter(data())
    df = sc.parallelize((
        (i, next(data_iter)) for i in range(int(num_records))
    )).toDF(('index', 'key_col'))

    return df

sample_df(1e3).show(n=5)
+-----+-------+
|index|key_col|
+-----+-------+
|    0|    139|
|    1|     88|
|    2|    151|
|    3|    221|
|    4|     81|
+-----+-------+
only showing top 5 rows

现在,让我们提出三种不同的计算 TopK 的方法:

from pyspark.sql import Window
from pyspark.sql import functions


def top_df_0(df, key_col, K):
    """
    Using window functions.  Handles ties OK.
    """
    window = Window.orderBy(functions.col(key_col).desc())
    return (df
            .withColumn("rank", functions.rank().over(window))
            .filter(functions.col('rank') <= K)
            .drop('rank'))


def top_df_1(df, key_col, K):
    """
    Using limit(K). Does NOT handle ties appropriately.
    """
    return df.orderBy(functions.col(key_col).desc()).limit(K)


def top_df_2(df, key_col, K):
    """
    Using limit(k) and then filtering.  Handles ties OK."
    """
    num_records = df.count()
    value_at_k_rank = (df
                       .orderBy(functions.col(key_col).desc())
                       .limit(k)
                       .select(functions.min(key_col).alias('min'))
                       .first()['min'])

    return df.filter(df[key_col] >= value_at_k_rank)

名为top_df_1 的函数与您最初实现的函数相似。它给你非确定性行为的原因是它不能很好地处理关系。如果您有大量数据并且只对近似答案感兴趣以提高性能,那么这可能是一个不错的选择。

最后,让我们进行基准测试

对于基准测试,使用具有 400 万个条目的 Spark DF 并定义一个便利函数:

NUM_RECORDS = 4e6
test_df = sample_df(NUM_RECORDS).cache()

def show(func, df, key_col, K):
    func(df, key_col, K).select(
      functions.max(key_col),
      functions.min(key_col),
      functions.count(key_col)
    ).show()

让我们看看判决:

%timeit show(top_df_0, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+

1 loops, best of 3: 1.62 s per loop


%timeit show(top_df_1, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           100|
+------------+------------+--------------+

1 loops, best of 3: 252 ms per loop


%timeit show(top_df_2, test_df, "key_col", K=100)
+------------+------------+--------------+
|max(key_col)|min(key_col)|count(key_col)|
+------------+------------+--------------+
|         502|         420|           108|
+------------+------------+--------------+

1 loops, best of 3: 725 ms per loop

(请注意,top_df_0top_df_2 在前 100 名中有 108 个条目。这是由于存在第 100 名的并列条目。top_df_1 实现忽略了并列条目。)。

底线

如果您想要一个准确的答案,请使用 top_df_2(它比 top_df_0 好大约 2 倍)。如果您想要另一个 x2 的性能并且可以接受近似答案,请使用 top_df_1

【讨论】:

【参考方案2】:

选项:

1) 在窗口函数中使用 pyspark sql row_number - 相关 SO:spark dataframe grouping, sorting, and selecting top rows for a set of columns

2) 将有序的 df 转换为 rdd 并在那里使用 top 函数(提示:这似乎并没有真正维护我的快速测试中的排序,但是 YMMV)

【讨论】:

【参考方案3】:

您应该尝试使用head() 而不是limit()

#sample data
df = sc.parallelize([
    ['123', 'b'], ['666', 'a'],
    ['345', 'd'], ['555', 'a'],
    ['456', 'b'], ['444', 'a'],
    ['678', 'd'], ['333', 'a'],
    ['135', 'd'], ['234', 'd'],
    ['987', 'c'], ['987', 'e']
]).toDF(('col1', 'key_col'))

#select top 'n' 'key_col' values from dataframe 'df'
def retrieve_top_n(df, key, n):
    return sqlContext.createDataFrame(df.groupBy(key).count().orderBy('count', ascending=False).head(n)).select(key)

retrieve_top_n(df, 'key_col', 3).show()

希望这会有所帮助!

【讨论】:

@Jing 如果有帮助请不要忘记告诉我们! 谢谢Prem!。这确实会奏效。但是,我意识到这比我想要的要慢一些,因为我们正在收集驱动程序,然后将列表重新并行化到 DataFrame。现在我更喜欢由建议的窗口函数方法。 @加伦

以上是关于PySpark,DataFrame 的顶部的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

pyspark dataframe数据连接(join)转化为pandas dataframe基于多个字段删除冗余数据

Pyspark Dataframe TypeError:预期的字符串或缓冲区

使用 UDF 从 PySpark Dataframe 解析嵌套的 XML 字段

为啥这个 PySpark 加入失败?

PySpark DataFrame在使用explode之前将字符串的列更改为数组