如何在pyspark数据框中找到没有分组的累积频率

Posted

技术标签:

【中文标题】如何在pyspark数据框中找到没有分组的累积频率【英文标题】:How to find cumulative frequency without group by in pyspark dataframe 【发布时间】:2017-03-19 19:29:59 【问题描述】:

我在 pyspark 数据框中有一个计数列:

id   Count  Percent  
a     3       50    
b     3       50

我想要一个结果数据框:

id   Count Percent CCount CPercent  
 a     3      50       3      50  
 b     3      50       6      100

我不能使用 pandas 数据框,因为数据库非常大。 我找到了指向窗口分区的答案,但我没有这样的列来分区。 请任何人都可以告诉如何在 pyspark 数据框中执行此操作。 注意:pyspark 1.6 版

【问题讨论】:

这可能就是你要找的东西:***.com/questions/65787753/… 【参考方案1】:

窗口化方法需要将所有数据移动到一个分区中,正如您在帖子中指出的那样,您的数据集对于此操作来说太大了。为了解决这个问题,我稍微修改了这个approach。该方法在为每个分区构建偏移字典后计算每个分区的累积和。这允许计算每个分区的累积总和,同时对数据进行最少的重新洗牌:

首先让我们生成一些测试数据:

data = sc.parallelize([('a',1,25.0),('b',2,25.0),('c',3,50.0)]).toDF(['id','Count','Percent'])    

这些是我调整过的辅助方法(请参阅此处的original code)

from collections import defaultdict
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql import Window

def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
    cumulative_sum = defaultdict(float)
    for event in event_stream:
        cumulative_sum["Count"] += event["Count"]
        cumulative_sum["Percent"] += event["Percent"]
    for grp, cumulative_sum in cumulative_sum .iteritems():
        yield (grp, (partition_index, cumulative_sum))

def compute_offsets_per_group_factory(num_partitions):
    def _mapper(partial_sum_stream):
        per_partition_cumulative_sum = dict(partial_sum_stream)
        cumulative_sum = 0
        offset = 
        for partition_index in range(num_partitions):
            offset[partition_index] = cumulative_sum
            cumulative_sum += per_partition_cumulative_sum.get(partition_index, 0)
        return offset
    return _mapper

def compute_cumulative_sum_per_group_factory(global_offset):
    def _mapper(partition_index, event_stream):
        local_cumulative_sum = defaultdict(float)
        for event in event_stream:
            local_cumulative_sum["Count"] += event["Count"]
            count_cumulative_sum = local_cumulative_sum["Count"] + global_offset.value["Count"][partition_index]
            local_cumulative_sum["Percent"] += event["Percent"]
            percentage_cumulative_sum = local_cumulative_sum["Percent"] + global_offset.value["Percent"][partition_index]
            yield Row(CCount= count_cumulative_sum, CPercent = percentage_cumulative_sum, **event.asDict())
    return _mapper

def compute_cumulative_sum(points_rdd):
    # First pass to compute the cumulative offset dictionary
    compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())
    offsets_per_group = points_rdd.\
        mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
        groupByKey().mapValues(compute_offsets_per_group).\
        collectAsMap()
    # Second pass to compute the cumulative sum using the offset dictionary
    sc = points_rdd.context
    compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))
    return points_rdd.\
        mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)

在测试数据上使用这些辅助方法:

compute_cumulative_sum(data.rdd).toDF().show()

给予:

+------+--------+-----+-------+---+
|CCount|CPercent|Count|Percent| id|
+------+--------+-----+-------+---+
|   1.0|    25.0|    1|   25.0|  a|
|   3.0|    50.0|    2|   25.0|  b|
|   6.0|   100.0|    3|   50.0|  c|
+------+--------+-----+-------+---+

【讨论】:

不确定是否可以调整此代码以实现与 Pandas 在medium.com/eduonline24/… 中给出的相同。这就是我正在寻找的,但在 PySpark 中。 (根据链接的内容,我需要 SAS Proc Freq 的相同行为)。也许值得一个单独的问题。 如果你能解决它,请感谢:***.com/questions/65787753/… 嗨@Alex - 我试图将它应用到另一个问题***.com/questions/65787753/…,但没有成功。你能帮忙看看吗?感谢您的帮助!

以上是关于如何在pyspark数据框中找到没有分组的累积频率的主要内容,如果未能解决你的问题,请参考以下文章

pyspark中基于条件对多列进行分组的累积和函数

使用 pyspark 数据框中的复制名称加入后使用左表中的所有列进行分组

pySpark 数据框中的累积积

有啥方法可以在 pyspark 数据框中找到包含数据的列数

pyspark 数据框中所有列的总计数为零

在 pyspark 数据框中查找不重叠的窗口