多个分区的 kafka 流式传输行为
Posted
技术标签:
【中文标题】多个分区的 kafka 流式传输行为【英文标题】:kafka streaming behaviour for more than one partition 【发布时间】:2017-06-08 16:27:09 【问题描述】:我正在消费 Kafka 主题。本主题有 3 个分区。 我正在使用 foreachRDD 处理每个批次 RDD(使用 processData 方法处理每个 RDD,并最终从中创建一个 DataSet)。
现在,您可以看到我有计数变量,并且我在“processData”方法中增加了这个计数变量,以检查我处理了多少实际记录。 (我明白,每个 RDD 都是 kafka 主题记录的集合,数量取决于批间隔大小)
现在,输出是这样的:
1 1 1 2 3 2 4 3 5 ....
这让我觉得这是因为我可能有 3 个消费者(因为我有 3 个分区),并且每个消费者都将分别调用“foreachRDD”方法,所以相同的计数被打印不止一次,因为每个消费者可能已缓存其计数副本。
但我得到的最终输出数据集包含所有记录。
那么,Spark 会在内部合并所有数据吗?它如何确定要联合的内容? 我正在尝试理解这种行为,以便形成我的逻辑
int 计数 = 0;
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<K, String>>>()
public void call(JavaRDD<ConsumerRecord<K, V>> rdd)
System.out.println("NUmber of elements in RDD : "+ rdd.count());
List<Row> rows = rdd.map(record -> processData(record))
.reduce((rows1, rows2) ->
rows1.addAll(rows2);
return rows1;
);
StructType schema = DataTypes.createStructType(fields);
Dataset ds = ss.createDataFrame(rows, schema);
ds.createOrReplaceTempView("trades");
ds.show();
);
【问题讨论】:
【参考方案1】:这些假设并不完全准确。
foreachRDD
是 Spark Streaming 中所谓的 output operations
之一。 output operations
的功能是按照batch interval
指定的时间间隔安排提供的关闭。该闭包中的代码在 spark 驱动程序上每个 batch interval
执行一次。未分布在集群中。
特别是,foreachRDD
是一个通用的output operation
,它提供对 DStream 中底层 RDD 的访问。应用于该 RDD 的操作将在 Spark 集群上执行。
所以,回到原始问题的代码,foreachRDD
闭包中的代码(例如 System.out.println("NUmber of elements in RDD : "+ rdd.count());
)在驱动程序上执行。这也是我们可以在控制台看到输出的原因。注意,这个print
中的rdd.count()
会触发集群上RDD的count
,所以count
是一个分布式操作,返回一个值给驱动,然后-在驱动上-print
操作发生。
现在是 RDD 的转换:
rdd.map(record -> processData(record))
正如我们所提到的,应用于RDD
的操作将在集群上执行。并且该执行将按照 Spark 执行模型进行;也就是说,转换被组装成阶段并应用于底层数据集的每个分区。鉴于我们正在处理 3 个 kafka 主题,我们将在 Spark 中有 3 个相应的分区。因此,processData
将应用于每个分区一次。
那么,Spark 会在内部合并所有数据吗?它如何确定要联合的内容?
与 Spark Streaming 的输出操作相同,Spark 也有操作。操作可能会将操作应用于数据并将结果提供给驱动程序。最简单的操作是collect
,它将完整的数据集提供给驱动程序,但存在可能无法放入内存的风险。其他常见操作,count
汇总数据集中的记录数,并将单个数字返回给驱动程序。
在上面的代码中,我们使用了reduce
,这也是一个应用提供的函数并将结果数据带到驱动程序的操作。正如问题中所表达的那样,使用该动作是“内部联合所有数据”。在 reduce 表达式中,我们实际上是在收集所有分布到单个本地集合中的数据。相当于这样做:rdd.map(record -> processData(record)).collect()
如果打算创建一个数据集,我们应该避免首先将所有数据“移动”到驱动程序。
更好的方法是:
val rows = rdd.map(record -> processData(record))
val df = ss.createDataFrame(rows, schema);
...
在这种情况下,所有分区的数据将保持在其所在的执行器本地。
请注意,应避免将数据移动到驱动程序。它速度很慢,并且在大型数据集的情况下可能会使作业崩溃,因为驱动程序通常无法保存集群中的所有可用数据。
【讨论】:
谢谢,解释得很好。我仍然有以下疑问:->即使我们在一个 kafka 主题中有 3 个分区,我们仍然会在批处理间隔中获得一个 RDD,并且它具有来自所有 3 个分区的数据。那你为什么说“因此,processData 将应用于每个分区一次”。 ?我的意思是,spark 唯一知道的是它有一个 RDD,它没有任何关于 kafka 分区的信息,对吧? -> 您说“我们应该避免将所有数据移动到驱动程序”:所以假设我使用“收集”,因此数据将保留在集群上。现在,如果我对此数据集执行一些 SQL 操作。我会得到所有分区的综合结果吗? @AmanpreetKhurana re:分区。分区是 Spark 内部工作的关键。无论是批处理还是流式传输。在 kafka 直接消费者的情况下,Spark 将为正在消费的 kafka 主题的每个分区创建一个 RDD 分区。 @AmanpreetKhurana re:获取数据。 “收集”会将所有数据发送给驱动程序。除非严格要求,否则请勿使用。 re: SQL: Yes - 如果你应用 SQL 操作,你会得到所有分区的组合结果。 Spark 会透明地为您处理。 最后,您建议的方法: val rows = rdd.map(record -> processData(record)) val df = ss.createDataFrame(rows, schema);实际上,我的 map 函数返回 JavaRDD>。此返回类型不能直接转换为 DataFrame。所以,我可能需要使用 collect(),有什么建议吗?以上是关于多个分区的 kafka 流式传输行为的主要内容,如果未能解决你的问题,请参考以下文章
在使用 Android 2.2 的 HTC Desire 上使用 Android MediaPlayer 进行流式传输时的奇怪行为