在 Spark 中加入倾斜的数据集?
Posted
技术标签:
【中文标题】在 Spark 中加入倾斜的数据集?【英文标题】:Skewed dataset join in Spark? 【发布时间】:2016-11-02 06:18:19 【问题描述】:我正在使用 Spark RDD 加入两个大型数据集。一个数据集非常倾斜,因此很少有执行程序任务需要很长时间才能完成工作。我该如何解决这种情况?
【问题讨论】:
【参考方案1】:关于如何完成的相当不错的文章:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
短版:
将随机元素添加到大型 RDD 并使用它创建新的连接键 使用explode/flatMap 将随机元素添加到小型RDD 以增加条目数并创建新的连接键 在新的连接键上加入 RDD,由于随机播种,现在可以更好地分布【讨论】:
浏览了您提到的文章。在我的例子中,更大的表有 23500 个分区和 20 亿条记录。较小的表有 500 万条记录。我如何确定'N'的值?有什么建议吗? @kalpesh,还有一个参数需要考虑,Spark 中的 shuffle 块的大小应该小于 2GB SPARK-6235。我建议关注分区的大小,通常应该是~128MB,而不是分区数。我已经看到应用程序在许多分区 (> 32k) 上运行良好。 @kalpesh 好吧,您想在新键上创建足够多的分区,以便所有执行程序将数据加入以最大化并行性。 @LiMuBei 有一些我在 python 代码中没有得到的东西small_rdd_transformed = small_rdd.cartesian(sc.parallelize(range(0, N))).map(lambda x: ((x[0][0], x[1]), x[0][1])).coalesce(num_parts).cache() # replicate the small rdd
in scala x is a tuple2 那么 x[0][0] 是什么意思?..
我的错误是缺少mapPartitionWithIndex
【参考方案2】:
假设您必须在 A.id=B.id 上连接两个表 A 和 B。让我们假设表 A 在 id=1 上存在偏差。
即从 A 中选择 A.id 在 A.id = B.id 上加入 B
解决倾斜连接问题有两种基本方法:
方法一:
将您的查询/数据集分成 2 个部分 - 一个仅包含倾斜数据,另一个包含非倾斜数据。 在上面的例子中。查询将变为 -
1. select A.id from A join B on A.id = B.id where A.id <> 1;
2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
第一个查询不会有任何偏差,因此 ResultStage 的所有任务将大致同时完成。
如果我们假设 B 只有几行 B.id = 1,那么它将适合内存。所以第二个查询将被转换为广播连接。这在 Hive 中也称为 Map-side join。
参考:https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization
然后可以合并两个查询的部分结果以获得最终结果。
方法二:
上面 LeMuBei 也提到过,第二种方法尝试通过附加额外的列来随机化连接键。 步骤:
在较大的表 (A) 中添加一列,例如 skewLeft,并为所有行填充 0 到 N-1 之间的随机数。
在较小的表 (B) 中添加一列,例如 skewRight。将较小的表复制 N 次。因此,对于原始数据的每个副本,新 skewRight 列中的值将从 0 变化到 N-1。为此,您可以使用explode sql/dataset 运算符。
在 1 和 2 之后,连接 2 个数据集/表,连接条件更新为 -
*A.id = B.id && A.skewLeft = B.skewRight*
参考:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
【讨论】:
@prakharjain- 如果您可以使用一些简单的示例数据框详细说明第二种方法,那就太好了。我需要在 Pyspark 中实现它。谢谢 第二种方法不是很清楚。介意添加一些示例吗? @redTiger datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark 用一个例子解释了方法 2。【参考方案3】:根据您遇到的特定类型的偏差,可能有不同的解决方法。基本思路是:
修改您的连接列,或创建一个新的连接列,该列没有倾斜,但仍保留足够的信息来进行连接 在该非倾斜列上执行连接 - 生成的分区不会倾斜 连接后,您可以将连接列更新回您的首选格式,或者在创建新列时将其删除如果倾斜的数据参与连接,李牧北的答案中引用的“Fighting the Skew In Spark”文章是一种很好的技术。就我而言,偏斜是由连接列中的大量空值引起的。空值不参与连接,但由于连接列上的 Spark 分区,连接后的分区非常倾斜,因为有一个包含所有空值的巨大分区。
我通过添加一个新列来解决它,该列将所有空值更改为分布良好的临时值,例如“NULL_VALUE_X”,其中 X 被替换为 1 到 10,000 之间的随机数,例如(在 Java 中):
// Before the join, create a join column with well-distributed temporary values for null swids. This column
// will be dropped after the join. We need to do this so the post-join partitions will be well-distributed,
// and not have a giant partition with all null swids.
String swidWithDistributedNulls = "swid_with_distributed_nulls";
int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions
Column swidWithDistributedNullsCol =
when(csDataset.col(CS_COL_SWID).isNull(), functions.concat(
functions.lit("NULL_SWID_"),
functions.round(functions.rand().multiply(numNullValues)))
)
.otherwise(csDataset.col(CS_COL_SWID));
csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol);
然后在这个新列上加入,然后在加入之后:
outputDataset.drop(swidWithDistributedNullsCol);
【讨论】:
【参考方案4】:参考https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/ 下面是使用 Pyspark 数据帧 API 对抗 spark 偏差的代码
创建 2 个数据框:
from math import exp
from random import randint
from datetime import datetime
def count_elements(splitIndex, iterator):
n = sum(1 for _ in iterator)
yield (splitIndex, n)
def get_part_index(splitIndex, iterator):
for it in iterator:
yield (splitIndex, it)
num_parts = 18
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x))))
skewed_large_rdd = skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x))
skewed_large_df = spark.createDataFrame(skewed_large_rdd,['x','y'])
small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
small_df = spark.createDataFrame(small_rdd,['a','b'])
大df将数据分成100个bin,小df复制100次
salt_bins = 100
from pyspark.sql import functions as F
skewed_transformed_df = skewed_large_df.withColumn('salt', (F.rand()*salt_bins).cast('int')).cache()
small_transformed_df = small_df.withColumn('replicate', F.array([F.lit(i) for i in range(salt_bins)]))
small_transformed_df = small_transformed_df.select('*', F.explode('replicate').alias('salt')).drop('replicate').cache()
终于避免了歪斜的连接
t0 = datetime.now()
result2 = skewed_transformed_df.join(small_transformed_df, (skewed_transformed_df['x'] == small_transformed_df['a']) & (skewed_transformed_df['salt'] == small_transformed_df['salt']) )
result2.count()
print "The direct join takes %s"%(str(datetime.now() - t0))
【讨论】:
【参考方案5】:Apache DataFu 有两种执行倾斜连接的方法,这些方法实现了先前答案中的一些建议。
joinSkewed 方法进行加盐(添加随机数列来拆分倾斜的值)。
broadcastJoinSkewed 方法适用于您可以将数据框划分为倾斜和常规部分,如moriarty007 的答案中的Approach 2 所述。
DataFu 中的这些方法对于使用 Spark 2.x 的项目很有用。如果您已经使用 Spark 3,则有 dedicated methods for doing skewed joins。
完全披露 - 我是 Apache DataFu 的成员。
【讨论】:
【参考方案6】:您可以尝试将“偏斜”的 RDD 重新分区到更多分区,或者尝试增加 spark.sql.shuffle.partitions
(默认为 200)。
在你的情况下,我会尝试将分区的数量设置为远高于执行者的数量。
【讨论】:
spark.sql.shuffle.partitions 不会帮助扭曲数据。将有 200 个分区,但其中只有少数有数据。 将 spark.sql.shuffle.partitions 增加到更高的数字不会帮助解决 Skew。 skewed key 对应的所有数据仍然会进入同一个 reducer,会导致速度变慢。 @prakharjain 这并不完全正确。增加分区数将减少具有许多记录的两个键放在同一分区中的机会。 spark.sql.shuffle.partitions 不会成功。加入倾斜数据将导致数据混洗的热点问题,因为连接点上的相同值将被散列到相同的散列键中,因此,所有这些行(具有相同值)将被发送到同一个执行程序在洗牌过程中。参考:oreilly.com/library/view/high-performance-spark/9781491943199/…以上是关于在 Spark 中加入倾斜的数据集?的主要内容,如果未能解决你的问题,请参考以下文章
使用用户定义的函数在 spark 中加入数据集时需要填充其他信息
hadoop2.0中加入全新的集群资源管理器,下面哪个不是yarn中的组件