如何在分区之间均匀分布值? (“反分区”)

Posted

技术标签:

【中文标题】如何在分区之间均匀分布值? (“反分区”)【英文标题】:How to spread values evenly across partitions? ("Anti-partition") 【发布时间】:2020-07-28 14:40:05 【问题描述】:

编辑 [1]:修改示例数据以准确显示情况。编辑 [2]:添加(测试)代码以显示其应如何工作.

我有一个如下所示的 (PySpark) 数据框:

示例数据(显示为 2 列):

ID  Type Other      ID  Type Other
--------------      --------------
101 A    1          105 A     1
101 A    2          105 A     2
101 B    1          105 B     1
102 A    1          105 A     4
102 A    2          106 A     1
103 A    1          106 A     2
103 A    2          201 B     1
103 B    1          201 A     1
104 A    1          202 B     1
104 A    2          203 B     1
104 A    3          203 B     2

我已经按 ID 分区了。

如何对这些数据进行分区,以使 type在多个分区中平均分布

所以,如果我只是按 ID 分区,并且我正在制作 3 个分区,我可以得到这个:

分区:

1        2        3
---      ---      ---
102 A *  101 A *  201 B *
104 A *  103 A *  202 B *
106 A *  105 A *  203 B *
         101 B *  201 A *
         103 B *
         105 B *

但是,处理 B 比处理 A 在计算上更昂贵,所以我在只有类型 B 的分区 3 上遇到了瓶颈。

理想情况下,我想像这样对数据进行分区:

1        2        3
---      ---      ---
101 A *  103 A *  105 A *
102 A *  104 A *  106 A *
201 B *  202 B *  203 B *
101 B *  103 B *  105 B *
201 A *

我该怎么做?

以下代码是我想要做的一个例子:

import random

import pyspark.sql.functions as f
from pyspark import Row
from pyspark.sql import DataFrame

from test_common.test_base import PySparkTest

RANDOM = random.Random()


def spread_values_OVER_partitions(df_input: DataFrame, concentrate_col_name: str, spread_col_name: str) -> DataFrame:
    """This method SHOULD partition a dataframe so that the first column is partitioned normally, but the "spread_col_name" column is spread over all partitions.

    Args:
        df_input:               The dataframe to partitions
        concentrate_col_name:   The column (name) on which you should (normally) partition.
        spread_col_name:        The column (name) over which values should be SPREAD over the partitions.

    Returns:
        The repartitioned dataframe.
    """

    # THIS DOES NOT WORK!
    return df_input.repartition(3, concentrate_col_name, spread_col_name)


class PartitionSpreadTest(PySparkTest):

    def test_spread_partitioning(self):
        """Test how to spread a certain columns values *OVER* partitions, instead of concentrating them."""

        test_data_tuple = [(id, 'A', other) for id in range(101, 106) for other in range(1, RANDOM.randint(3, 4))]
        test_data_tuple.extend([(id, 'B', other) for id in [104] + list(range(201, 204)) for other in range(1, RANDOM.randint(4, 5))])

        test_data_dict = ['id':    r[0],
                           'type':  r[1],
                           'other': r[2],
                           
                          for r in test_data_tuple]

        df_test = self.spark.createDataFrame(Row(**x) for x in test_data_dict)
        num_part = 3

        df_test.groupby('id', 'type').agg(f.count('id')).orderBy('id', 'type').show(100, False)

        # This DOES NOT WORK!
        df_repartitioned = spread_values_OVER_partitions(df_test, concentrate_col_name='id', spread_col_name='type')

        partition_cols = ['id', 'type']
        print(f"Num partitions: [num_part:3]: \n")
        # print partitions
        (df_repartitioned.select(
            *partition_cols,
            f.spark_partition_id().alias('part_id'))
         .distinct()
         .groupBy(*partition_cols)
         .agg(f.collect_list('part_id').alias('part_ids'))
         .withColumn('num_parts', f.size('part_ids'))
         .orderBy('part_ids', *partition_cols).select(
            *partition_cols,
            'part_ids',
            'num_parts')
         .show(1000, False))

但是,上面的代码输出如下:

+---+----+--------+---------+
|id |type|part_ids|num_parts|
+---+----+--------+---------+
|101|A   |[0]     |1        |
|104|A   |[0]     |1        |
|105|A   |[0]     |1        |
|202|B   |[0]     |1        |
|203|B   |[0]     |1        |
|104|B   |[1]     |1        |
|201|B   |[1]     |1        |
|102|A   |[2]     |1        |
|103|A   |[2]     |1        |
+---+----+--------+---------+

在这种情况下,

分区[1]包含类型B 分区[2]包含A型

这与我想要的相反

【问题讨论】:

我开始怀疑我是否需要在 ('type', 'id') 上设置一个窗口函数,为每个组合分配不同的分区 id ("row_number)。???? 我希望有一个内置的或更优雅的解决方案。 【参考方案1】:

如果您需要充分的灵活性,您也可以

将 Dataframe 转换为 RDD 如下所示应用自定义分区器 必要时返回 Dataframe

分区器

Partitioner 类用于根据键对数据进行分区。它接受两个参数 numPartitions 和 partitionFunc 来启动,如以下代码所示:

def __init__(self, numPartitions, partitionFunc):

第一个参数定义分区数,第二个参数定义分区函数。

Source

这是一个伪代码来展示基本思想:

伪代码

# obviously this will put all your values in only partition 0 so thi function should get more complex
def myPartitionerFunc(key): 
    return 0 

# when defining the key-value paired RDD you could e.g. concatenate 'ID' and 'Type' value
dfToRDD = df.rdd.map(lambda x: (x[0],x))
rdd = dfToRDD.partitionBy(3, myPartitionerFunc) 
dfPartitioned = spark.createDataFrame(rdd)

【讨论】:

这是一个有趣的方法。不幸的是,AFAIK,df rdd 之间的往返基本上会杀死我们严重依赖的 Catalyst 优化器。 是的,这对您提到的优化有重大影响。我会在我的回答中更清楚地说明,只有在您真正知道自己在做什么并且高度取决于您的用例时才应该使用这种方法。感谢您指出这一点! Mike,我也不相信有一个函数可以满足我的需要:将“正常”分区与 将另一列的值传播结合到所有分区上。你对此有什么见解吗? 我没有听说过它,也不知道有这种功能......【参考方案2】:

您可以按任意数量的列重新分区。在你的情况下,你可以这样做:

df.repartition("ID", "Type")

通过IDType 的(散列)重新分区。文档here.

但是请注意,如果Type 依赖于ID(如您的示例所示),它不会有太大变化。这个:

1        2        3
---      ---      ---
101 A *  104 A *  201 B *
102 A *  105 A *  202 B *
103 A *  106 A *  203 B *

除非你 repartition by range 单独使用 ID,否则不太可能。如果您使用标准哈希分区,则 ID(以及类型)应该随机分布在各个分区中,无论您选择什么键。

【讨论】:

这是不正确的。这集中行在same分区中具有相似的“类型”值。这正是我不想要的。 否 - 它集中了具有相同 id AND 类型的行。相同的类型,但不同的 id 将落在不同的分区中。使用更大的数据集和分区数进行检查。

以上是关于如何在分区之间均匀分布值? (“反分区”)的主要内容,如果未能解决你的问题,请参考以下文章

使用自定义分区器解决Spark DataSet数据分区不均匀的问题

如何均匀分布 JavaFX VBox 的元素

如何在Android中均匀分布单选按钮?

如何将元素水平均匀分布?

如何使用具有不同分片数量的索引在 ElasticSearch 上实现负载的均匀分布?

如何在更高维度的超球面上均匀分布点?