Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌
Posted
技术标签:
【中文标题】Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌【英文标题】:Spark Streaming Dataframe perform, stateful, partition local groupBy, avoiding shuffles 【发布时间】:2020-03-13 19:38:11 【问题描述】:此时感觉有些失落。
我有一个基于 Spark 2.4.2 和 Kafka 的流应用程序,它将聚合(时间窗口)流写回 Kafka:
[DF1
] 流式预分区数据帧(在 Key theKey
上),即
保证 K 到达同一个分区的流
每次。
[DF2
] 我与 DF1 连接的查找表(约 1000 行)。
GroupBy 基于 Key 和连续移动的 1 天窗口。
DF1.join(DF2, "df1.a" === "df2.b", "left")
.withWatermark("timestamp", "24 hours")
.groupBy(window('timestamp, "24 hours"), 'theKey)
.agg(collect_list('payload) as "payload")
问题: 洗牌。通过预先对数据集进行预分区(在 Kafka 中),我希望能够实现分区本地 groupBy。不幸的是,这不起作用。
问题是,在没有洗牌的情况下,实现这一目标的正确方法是什么?有吗?
到目前为止我探索过的解决方案:
-
"agg over window...":流式传输中不支持(Spark 抛出:
Non-time-based windows are not supported on streaming
DataFrames/Datasets
)
mapPartitions:不确定如何考虑状态 (mapWithState)。 mapGroupsWithState
需要 KeyValueGroupedDataset[K, V]
,该 KeyValueGroupedDataset[K, V]
仅由 GroupByKey 提供。
我正在考虑的解决方案(不情愿地):
-
dataframe 上的mapPartitions,带有自定义状态管理。然而,这使得 Spark 的
有状态的流式传输,没用。
以某种方式将原始散列分区(来自 Kafka 数据帧)插入 Spark,以便它可以永久处理随机播放(并且不使用默认的
200
),但我还没有找到明确的来源。
非常感谢任何帮助!
【问题讨论】:
尝试强制广播加入DF1.join(broadcast(DF2), ...)
。这应该可以防止洗牌并将您的密钥保留在其原始分区中。
@HristoIliev 我就是这么做的。它下降了一点,但仍然是一个相当大的洗牌。
【参考方案1】:
实际上,查找表导致了所有的洗牌。我希望 Spark 更喜欢对较大的数据集进行分区而不是较小的查找表,但事实并非如此。它采用流式数据集,忽略分区并将它们打乱到查找表分区所在的位置。
只要我按照 Streaming 数据帧对查找表进行了重新分区,Spark 就很高兴。不过,Spark 并没有将较大数据集的分区优先于较小数据集,这违反直觉。
【讨论】:
我认为您的DF2
足够小并且位于单个分区中。在这种情况下,将其合并为一个分区并尝试使用DF1.join(broadcast(DF2.coalesce(1)), ...)
进行广播连接。
这是一个很好的建议,谢谢。 DF 很小(从 1000 到 100 万行不等),但它们有很多这样的数据集,所以我不愿意将它们全部放在一个执行器上。以上是关于Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌
Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame
将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame
如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)